diff --git a/src/proto/flwr/proto/serverappio.proto b/src/proto/flwr/proto/serverappio.proto index c57b48b0aa39..1297d56cf0f2 100644 --- a/src/proto/flwr/proto/serverappio.proto +++ b/src/proto/flwr/proto/serverappio.proto @@ -20,7 +20,6 @@ package flwr.proto; import "flwr/proto/log.proto"; import "flwr/proto/node.proto"; import "flwr/proto/message.proto"; -import "flwr/proto/task.proto"; import "flwr/proto/run.proto"; import "flwr/proto/fab.proto"; @@ -31,12 +30,10 @@ service ServerAppIo { // Return a set of nodes rpc GetNodes(GetNodesRequest) returns (GetNodesResponse) {} - // Create one or more tasks - rpc PushTaskIns(PushTaskInsRequest) returns (PushTaskInsResponse) {} + // Create one or more messages rpc PushMessages(PushInsMessagesRequest) returns (PushInsMessagesResponse) {} - // Get task results - rpc PullTaskRes(PullTaskResRequest) returns (PullTaskResResponse) {} + // Get message results rpc PullMessages(PullResMessagesRequest) returns (PullResMessagesResponse) {} // Get run details @@ -68,21 +65,6 @@ service ServerAppIo { message GetNodesRequest { uint64 run_id = 1; } message GetNodesResponse { repeated Node nodes = 1; } -// PushTaskIns messages -message PushTaskInsRequest { - repeated TaskIns task_ins_list = 1; - uint64 run_id = 2; -} -message PushTaskInsResponse { repeated string task_ids = 2; } - -// PullTaskRes messages -message PullTaskResRequest { - Node node = 1; - repeated string task_ids = 2; - uint64 run_id = 3; -} -message PullTaskResResponse { repeated TaskRes task_res_list = 1; } - // PushMessages messages message PushInsMessagesRequest { repeated Message messages_list = 1; diff --git a/src/py/flwr/proto/serverappio_pb2.py b/src/py/flwr/proto/serverappio_pb2.py index d750c05a1759..9f8c53149b6e 100644 --- a/src/py/flwr/proto/serverappio_pb2.py +++ b/src/py/flwr/proto/serverappio_pb2.py @@ -25,46 +25,37 @@ from flwr.proto import log_pb2 as flwr_dot_proto_dot_log__pb2 from flwr.proto import node_pb2 as flwr_dot_proto_dot_node__pb2 from flwr.proto import message_pb2 as flwr_dot_proto_dot_message__pb2 -from flwr.proto import task_pb2 as flwr_dot_proto_dot_task__pb2 from flwr.proto import run_pb2 as flwr_dot_proto_dot_run__pb2 from flwr.proto import fab_pb2 as flwr_dot_proto_dot_fab__pb2 -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/serverappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/log.proto\x1a\x15\x66lwr/proto/node.proto\x1a\x18\x66lwr/proto/message.proto\x1a\x15\x66lwr/proto/task.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x14\x66lwr/proto/fab.proto\"!\n\x0fGetNodesRequest\x12\x0e\n\x06run_id\x18\x01 \x01(\x04\"3\n\x10GetNodesResponse\x12\x1f\n\x05nodes\x18\x01 \x03(\x0b\x32\x10.flwr.proto.Node\"P\n\x12PushTaskInsRequest\x12*\n\rtask_ins_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.TaskIns\x12\x0e\n\x06run_id\x18\x02 \x01(\x04\"\'\n\x13PushTaskInsResponse\x12\x10\n\x08task_ids\x18\x02 \x03(\t\"V\n\x12PullTaskResRequest\x12\x1e\n\x04node\x18\x01 \x01(\x0b\x32\x10.flwr.proto.Node\x12\x10\n\x08task_ids\x18\x02 \x03(\t\x12\x0e\n\x06run_id\x18\x03 \x01(\x04\"A\n\x13PullTaskResResponse\x12*\n\rtask_res_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.TaskRes\"T\n\x16PushInsMessagesRequest\x12*\n\rmessages_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.Message\x12\x0e\n\x06run_id\x18\x02 \x01(\x04\".\n\x17PushInsMessagesResponse\x12\x13\n\x0bmessage_ids\x18\x01 \x03(\t\"=\n\x16PullResMessagesRequest\x12\x13\n\x0bmessage_ids\x18\x01 \x03(\t\x12\x0e\n\x06run_id\x18\x02 \x01(\x04\"E\n\x17PullResMessagesResponse\x12*\n\rmessages_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.Message\"\x1c\n\x1aPullServerAppInputsRequest\"\x7f\n\x1bPullServerAppInputsResponse\x12$\n\x07\x63ontext\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x02 \x01(\x0b\x32\x0f.flwr.proto.Run\x12\x1c\n\x03\x66\x61\x62\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Fab\"S\n\x1bPushServerAppOutputsRequest\x12\x0e\n\x06run_id\x18\x01 \x01(\x04\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\"\x1e\n\x1cPushServerAppOutputsResponse2\xd5\x08\n\x0bServerAppIo\x12J\n\tCreateRun\x12\x1c.flwr.proto.CreateRunRequest\x1a\x1d.flwr.proto.CreateRunResponse\"\x00\x12G\n\x08GetNodes\x12\x1b.flwr.proto.GetNodesRequest\x1a\x1c.flwr.proto.GetNodesResponse\"\x00\x12P\n\x0bPushTaskIns\x12\x1e.flwr.proto.PushTaskInsRequest\x1a\x1f.flwr.proto.PushTaskInsResponse\"\x00\x12Y\n\x0cPushMessages\x12\".flwr.proto.PushInsMessagesRequest\x1a#.flwr.proto.PushInsMessagesResponse\"\x00\x12P\n\x0bPullTaskRes\x12\x1e.flwr.proto.PullTaskResRequest\x1a\x1f.flwr.proto.PullTaskResResponse\"\x00\x12Y\n\x0cPullMessages\x12\".flwr.proto.PullResMessagesRequest\x1a#.flwr.proto.PullResMessagesResponse\"\x00\x12\x41\n\x06GetRun\x12\x19.flwr.proto.GetRunRequest\x1a\x1a.flwr.proto.GetRunResponse\"\x00\x12\x41\n\x06GetFab\x12\x19.flwr.proto.GetFabRequest\x1a\x1a.flwr.proto.GetFabResponse\"\x00\x12h\n\x13PullServerAppInputs\x12&.flwr.proto.PullServerAppInputsRequest\x1a\'.flwr.proto.PullServerAppInputsResponse\"\x00\x12k\n\x14PushServerAppOutputs\x12\'.flwr.proto.PushServerAppOutputsRequest\x1a(.flwr.proto.PushServerAppOutputsResponse\"\x00\x12\\\n\x0fUpdateRunStatus\x12\".flwr.proto.UpdateRunStatusRequest\x1a#.flwr.proto.UpdateRunStatusResponse\"\x00\x12S\n\x0cGetRunStatus\x12\x1f.flwr.proto.GetRunStatusRequest\x1a .flwr.proto.GetRunStatusResponse\"\x00\x12G\n\x08PushLogs\x12\x1b.flwr.proto.PushLogsRequest\x1a\x1c.flwr.proto.PushLogsResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x1c\x66lwr/proto/serverappio.proto\x12\nflwr.proto\x1a\x14\x66lwr/proto/log.proto\x1a\x15\x66lwr/proto/node.proto\x1a\x18\x66lwr/proto/message.proto\x1a\x14\x66lwr/proto/run.proto\x1a\x14\x66lwr/proto/fab.proto\"!\n\x0fGetNodesRequest\x12\x0e\n\x06run_id\x18\x01 \x01(\x04\"3\n\x10GetNodesResponse\x12\x1f\n\x05nodes\x18\x01 \x03(\x0b\x32\x10.flwr.proto.Node\"T\n\x16PushInsMessagesRequest\x12*\n\rmessages_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.Message\x12\x0e\n\x06run_id\x18\x02 \x01(\x04\".\n\x17PushInsMessagesResponse\x12\x13\n\x0bmessage_ids\x18\x01 \x03(\t\"=\n\x16PullResMessagesRequest\x12\x13\n\x0bmessage_ids\x18\x01 \x03(\t\x12\x0e\n\x06run_id\x18\x02 \x01(\x04\"E\n\x17PullResMessagesResponse\x12*\n\rmessages_list\x18\x01 \x03(\x0b\x32\x13.flwr.proto.Message\"\x1c\n\x1aPullServerAppInputsRequest\"\x7f\n\x1bPullServerAppInputsResponse\x12$\n\x07\x63ontext\x18\x01 \x01(\x0b\x32\x13.flwr.proto.Context\x12\x1c\n\x03run\x18\x02 \x01(\x0b\x32\x0f.flwr.proto.Run\x12\x1c\n\x03\x66\x61\x62\x18\x03 \x01(\x0b\x32\x0f.flwr.proto.Fab\"S\n\x1bPushServerAppOutputsRequest\x12\x0e\n\x06run_id\x18\x01 \x01(\x04\x12$\n\x07\x63ontext\x18\x02 \x01(\x0b\x32\x13.flwr.proto.Context\"\x1e\n\x1cPushServerAppOutputsResponse2\xb1\x07\n\x0bServerAppIo\x12J\n\tCreateRun\x12\x1c.flwr.proto.CreateRunRequest\x1a\x1d.flwr.proto.CreateRunResponse\"\x00\x12G\n\x08GetNodes\x12\x1b.flwr.proto.GetNodesRequest\x1a\x1c.flwr.proto.GetNodesResponse\"\x00\x12Y\n\x0cPushMessages\x12\".flwr.proto.PushInsMessagesRequest\x1a#.flwr.proto.PushInsMessagesResponse\"\x00\x12Y\n\x0cPullMessages\x12\".flwr.proto.PullResMessagesRequest\x1a#.flwr.proto.PullResMessagesResponse\"\x00\x12\x41\n\x06GetRun\x12\x19.flwr.proto.GetRunRequest\x1a\x1a.flwr.proto.GetRunResponse\"\x00\x12\x41\n\x06GetFab\x12\x19.flwr.proto.GetFabRequest\x1a\x1a.flwr.proto.GetFabResponse\"\x00\x12h\n\x13PullServerAppInputs\x12&.flwr.proto.PullServerAppInputsRequest\x1a\'.flwr.proto.PullServerAppInputsResponse\"\x00\x12k\n\x14PushServerAppOutputs\x12\'.flwr.proto.PushServerAppOutputsRequest\x1a(.flwr.proto.PushServerAppOutputsResponse\"\x00\x12\\\n\x0fUpdateRunStatus\x12\".flwr.proto.UpdateRunStatusRequest\x1a#.flwr.proto.UpdateRunStatusResponse\"\x00\x12S\n\x0cGetRunStatus\x12\x1f.flwr.proto.GetRunStatusRequest\x1a .flwr.proto.GetRunStatusResponse\"\x00\x12G\n\x08PushLogs\x12\x1b.flwr.proto.PushLogsRequest\x1a\x1c.flwr.proto.PushLogsResponse\"\x00\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) _builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'flwr.proto.serverappio_pb2', _globals) if not _descriptor._USE_C_DESCRIPTORS: DESCRIPTOR._loaded_options = None - _globals['_GETNODESREQUEST']._serialized_start=182 - _globals['_GETNODESREQUEST']._serialized_end=215 - _globals['_GETNODESRESPONSE']._serialized_start=217 - _globals['_GETNODESRESPONSE']._serialized_end=268 - _globals['_PUSHTASKINSREQUEST']._serialized_start=270 - _globals['_PUSHTASKINSREQUEST']._serialized_end=350 - _globals['_PUSHTASKINSRESPONSE']._serialized_start=352 - _globals['_PUSHTASKINSRESPONSE']._serialized_end=391 - _globals['_PULLTASKRESREQUEST']._serialized_start=393 - _globals['_PULLTASKRESREQUEST']._serialized_end=479 - _globals['_PULLTASKRESRESPONSE']._serialized_start=481 - _globals['_PULLTASKRESRESPONSE']._serialized_end=546 - _globals['_PUSHINSMESSAGESREQUEST']._serialized_start=548 - _globals['_PUSHINSMESSAGESREQUEST']._serialized_end=632 - _globals['_PUSHINSMESSAGESRESPONSE']._serialized_start=634 - _globals['_PUSHINSMESSAGESRESPONSE']._serialized_end=680 - _globals['_PULLRESMESSAGESREQUEST']._serialized_start=682 - _globals['_PULLRESMESSAGESREQUEST']._serialized_end=743 - _globals['_PULLRESMESSAGESRESPONSE']._serialized_start=745 - _globals['_PULLRESMESSAGESRESPONSE']._serialized_end=814 - _globals['_PULLSERVERAPPINPUTSREQUEST']._serialized_start=816 - _globals['_PULLSERVERAPPINPUTSREQUEST']._serialized_end=844 - _globals['_PULLSERVERAPPINPUTSRESPONSE']._serialized_start=846 - _globals['_PULLSERVERAPPINPUTSRESPONSE']._serialized_end=973 - _globals['_PUSHSERVERAPPOUTPUTSREQUEST']._serialized_start=975 - _globals['_PUSHSERVERAPPOUTPUTSREQUEST']._serialized_end=1058 - _globals['_PUSHSERVERAPPOUTPUTSRESPONSE']._serialized_start=1060 - _globals['_PUSHSERVERAPPOUTPUTSRESPONSE']._serialized_end=1090 - _globals['_SERVERAPPIO']._serialized_start=1093 - _globals['_SERVERAPPIO']._serialized_end=2202 + _globals['_GETNODESREQUEST']._serialized_start=159 + _globals['_GETNODESREQUEST']._serialized_end=192 + _globals['_GETNODESRESPONSE']._serialized_start=194 + _globals['_GETNODESRESPONSE']._serialized_end=245 + _globals['_PUSHINSMESSAGESREQUEST']._serialized_start=247 + _globals['_PUSHINSMESSAGESREQUEST']._serialized_end=331 + _globals['_PUSHINSMESSAGESRESPONSE']._serialized_start=333 + _globals['_PUSHINSMESSAGESRESPONSE']._serialized_end=379 + _globals['_PULLRESMESSAGESREQUEST']._serialized_start=381 + _globals['_PULLRESMESSAGESREQUEST']._serialized_end=442 + _globals['_PULLRESMESSAGESRESPONSE']._serialized_start=444 + _globals['_PULLRESMESSAGESRESPONSE']._serialized_end=513 + _globals['_PULLSERVERAPPINPUTSREQUEST']._serialized_start=515 + _globals['_PULLSERVERAPPINPUTSREQUEST']._serialized_end=543 + _globals['_PULLSERVERAPPINPUTSRESPONSE']._serialized_start=545 + _globals['_PULLSERVERAPPINPUTSRESPONSE']._serialized_end=672 + _globals['_PUSHSERVERAPPOUTPUTSREQUEST']._serialized_start=674 + _globals['_PUSHSERVERAPPOUTPUTSREQUEST']._serialized_end=757 + _globals['_PUSHSERVERAPPOUTPUTSRESPONSE']._serialized_start=759 + _globals['_PUSHSERVERAPPOUTPUTSRESPONSE']._serialized_end=789 + _globals['_SERVERAPPIO']._serialized_start=792 + _globals['_SERVERAPPIO']._serialized_end=1737 # @@protoc_insertion_point(module_scope) diff --git a/src/py/flwr/proto/serverappio_pb2.pyi b/src/py/flwr/proto/serverappio_pb2.pyi index 817103020e5f..c90ea9768bc5 100644 --- a/src/py/flwr/proto/serverappio_pb2.pyi +++ b/src/py/flwr/proto/serverappio_pb2.pyi @@ -7,7 +7,6 @@ import flwr.proto.fab_pb2 import flwr.proto.message_pb2 import flwr.proto.node_pb2 import flwr.proto.run_pb2 -import flwr.proto.task_pb2 import google.protobuf.descriptor import google.protobuf.internal.containers import google.protobuf.message @@ -40,67 +39,6 @@ class GetNodesResponse(google.protobuf.message.Message): def ClearField(self, field_name: typing_extensions.Literal["nodes",b"nodes"]) -> None: ... global___GetNodesResponse = GetNodesResponse -class PushTaskInsRequest(google.protobuf.message.Message): - """PushTaskIns messages""" - DESCRIPTOR: google.protobuf.descriptor.Descriptor - TASK_INS_LIST_FIELD_NUMBER: builtins.int - RUN_ID_FIELD_NUMBER: builtins.int - @property - def task_ins_list(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[flwr.proto.task_pb2.TaskIns]: ... - run_id: builtins.int - def __init__(self, - *, - task_ins_list: typing.Optional[typing.Iterable[flwr.proto.task_pb2.TaskIns]] = ..., - run_id: builtins.int = ..., - ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id","task_ins_list",b"task_ins_list"]) -> None: ... -global___PushTaskInsRequest = PushTaskInsRequest - -class PushTaskInsResponse(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - TASK_IDS_FIELD_NUMBER: builtins.int - @property - def task_ids(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[typing.Text]: ... - def __init__(self, - *, - task_ids: typing.Optional[typing.Iterable[typing.Text]] = ..., - ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["task_ids",b"task_ids"]) -> None: ... -global___PushTaskInsResponse = PushTaskInsResponse - -class PullTaskResRequest(google.protobuf.message.Message): - """PullTaskRes messages""" - DESCRIPTOR: google.protobuf.descriptor.Descriptor - NODE_FIELD_NUMBER: builtins.int - TASK_IDS_FIELD_NUMBER: builtins.int - RUN_ID_FIELD_NUMBER: builtins.int - @property - def node(self) -> flwr.proto.node_pb2.Node: ... - @property - def task_ids(self) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[typing.Text]: ... - run_id: builtins.int - def __init__(self, - *, - node: typing.Optional[flwr.proto.node_pb2.Node] = ..., - task_ids: typing.Optional[typing.Iterable[typing.Text]] = ..., - run_id: builtins.int = ..., - ) -> None: ... - def HasField(self, field_name: typing_extensions.Literal["node",b"node"]) -> builtins.bool: ... - def ClearField(self, field_name: typing_extensions.Literal["node",b"node","run_id",b"run_id","task_ids",b"task_ids"]) -> None: ... -global___PullTaskResRequest = PullTaskResRequest - -class PullTaskResResponse(google.protobuf.message.Message): - DESCRIPTOR: google.protobuf.descriptor.Descriptor - TASK_RES_LIST_FIELD_NUMBER: builtins.int - @property - def task_res_list(self) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[flwr.proto.task_pb2.TaskRes]: ... - def __init__(self, - *, - task_res_list: typing.Optional[typing.Iterable[flwr.proto.task_pb2.TaskRes]] = ..., - ) -> None: ... - def ClearField(self, field_name: typing_extensions.Literal["task_res_list",b"task_res_list"]) -> None: ... -global___PullTaskResResponse = PullTaskResResponse - class PushInsMessagesRequest(google.protobuf.message.Message): """PushMessages messages""" DESCRIPTOR: google.protobuf.descriptor.Descriptor diff --git a/src/py/flwr/proto/serverappio_pb2_grpc.py b/src/py/flwr/proto/serverappio_pb2_grpc.py index 95b03ea3872b..351707b85a60 100644 --- a/src/py/flwr/proto/serverappio_pb2_grpc.py +++ b/src/py/flwr/proto/serverappio_pb2_grpc.py @@ -47,21 +47,11 @@ def __init__(self, channel): request_serializer=flwr_dot_proto_dot_serverappio__pb2.GetNodesRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_serverappio__pb2.GetNodesResponse.FromString, _registered_method=True) - self.PushTaskIns = channel.unary_unary( - '/flwr.proto.ServerAppIo/PushTaskIns', - request_serializer=flwr_dot_proto_dot_serverappio__pb2.PushTaskInsRequest.SerializeToString, - response_deserializer=flwr_dot_proto_dot_serverappio__pb2.PushTaskInsResponse.FromString, - _registered_method=True) self.PushMessages = channel.unary_unary( '/flwr.proto.ServerAppIo/PushMessages', request_serializer=flwr_dot_proto_dot_serverappio__pb2.PushInsMessagesRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_serverappio__pb2.PushInsMessagesResponse.FromString, _registered_method=True) - self.PullTaskRes = channel.unary_unary( - '/flwr.proto.ServerAppIo/PullTaskRes', - request_serializer=flwr_dot_proto_dot_serverappio__pb2.PullTaskResRequest.SerializeToString, - response_deserializer=flwr_dot_proto_dot_serverappio__pb2.PullTaskResResponse.FromString, - _registered_method=True) self.PullMessages = channel.unary_unary( '/flwr.proto.ServerAppIo/PullMessages', request_serializer=flwr_dot_proto_dot_serverappio__pb2.PullResMessagesRequest.SerializeToString, @@ -121,28 +111,16 @@ def GetNodes(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') - def PushTaskIns(self, request, context): - """Create one or more tasks - """ - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - def PushMessages(self, request, context): - """Missing associated documentation comment in .proto file.""" - context.set_code(grpc.StatusCode.UNIMPLEMENTED) - context.set_details('Method not implemented!') - raise NotImplementedError('Method not implemented!') - - def PullTaskRes(self, request, context): - """Get task results + """Create one or more messages """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') def PullMessages(self, request, context): - """Missing associated documentation comment in .proto file.""" + """Get message results + """ context.set_code(grpc.StatusCode.UNIMPLEMENTED) context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') @@ -209,21 +187,11 @@ def add_ServerAppIoServicer_to_server(servicer, server): request_deserializer=flwr_dot_proto_dot_serverappio__pb2.GetNodesRequest.FromString, response_serializer=flwr_dot_proto_dot_serverappio__pb2.GetNodesResponse.SerializeToString, ), - 'PushTaskIns': grpc.unary_unary_rpc_method_handler( - servicer.PushTaskIns, - request_deserializer=flwr_dot_proto_dot_serverappio__pb2.PushTaskInsRequest.FromString, - response_serializer=flwr_dot_proto_dot_serverappio__pb2.PushTaskInsResponse.SerializeToString, - ), 'PushMessages': grpc.unary_unary_rpc_method_handler( servicer.PushMessages, request_deserializer=flwr_dot_proto_dot_serverappio__pb2.PushInsMessagesRequest.FromString, response_serializer=flwr_dot_proto_dot_serverappio__pb2.PushInsMessagesResponse.SerializeToString, ), - 'PullTaskRes': grpc.unary_unary_rpc_method_handler( - servicer.PullTaskRes, - request_deserializer=flwr_dot_proto_dot_serverappio__pb2.PullTaskResRequest.FromString, - response_serializer=flwr_dot_proto_dot_serverappio__pb2.PullTaskResResponse.SerializeToString, - ), 'PullMessages': grpc.unary_unary_rpc_method_handler( servicer.PullMessages, request_deserializer=flwr_dot_proto_dot_serverappio__pb2.PullResMessagesRequest.FromString, @@ -329,33 +297,6 @@ def GetNodes(request, metadata, _registered_method=True) - @staticmethod - def PushTaskIns(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/flwr.proto.ServerAppIo/PushTaskIns', - flwr_dot_proto_dot_serverappio__pb2.PushTaskInsRequest.SerializeToString, - flwr_dot_proto_dot_serverappio__pb2.PushTaskInsResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) - @staticmethod def PushMessages(request, target, @@ -383,33 +324,6 @@ def PushMessages(request, metadata, _registered_method=True) - @staticmethod - def PullTaskRes(request, - target, - options=(), - channel_credentials=None, - call_credentials=None, - insecure=False, - compression=None, - wait_for_ready=None, - timeout=None, - metadata=None): - return grpc.experimental.unary_unary( - request, - target, - '/flwr.proto.ServerAppIo/PullTaskRes', - flwr_dot_proto_dot_serverappio__pb2.PullTaskResRequest.SerializeToString, - flwr_dot_proto_dot_serverappio__pb2.PullTaskResResponse.FromString, - options, - channel_credentials, - insecure, - call_credentials, - compression, - wait_for_ready, - timeout, - metadata, - _registered_method=True) - @staticmethod def PullMessages(request, target, diff --git a/src/py/flwr/proto/serverappio_pb2_grpc.pyi b/src/py/flwr/proto/serverappio_pb2_grpc.pyi index 5f353fe504de..b49a535334b3 100644 --- a/src/py/flwr/proto/serverappio_pb2_grpc.pyi +++ b/src/py/flwr/proto/serverappio_pb2_grpc.pyi @@ -21,23 +21,15 @@ class ServerAppIoStub: flwr.proto.serverappio_pb2.GetNodesResponse] """Return a set of nodes""" - PushTaskIns: grpc.UnaryUnaryMultiCallable[ - flwr.proto.serverappio_pb2.PushTaskInsRequest, - flwr.proto.serverappio_pb2.PushTaskInsResponse] - """Create one or more tasks""" - PushMessages: grpc.UnaryUnaryMultiCallable[ flwr.proto.serverappio_pb2.PushInsMessagesRequest, flwr.proto.serverappio_pb2.PushInsMessagesResponse] - - PullTaskRes: grpc.UnaryUnaryMultiCallable[ - flwr.proto.serverappio_pb2.PullTaskResRequest, - flwr.proto.serverappio_pb2.PullTaskResResponse] - """Get task results""" + """Create one or more messages""" PullMessages: grpc.UnaryUnaryMultiCallable[ flwr.proto.serverappio_pb2.PullResMessagesRequest, flwr.proto.serverappio_pb2.PullResMessagesResponse] + """Get message results""" GetRun: grpc.UnaryUnaryMultiCallable[ flwr.proto.run_pb2.GetRunRequest, @@ -92,33 +84,21 @@ class ServerAppIoServicer(metaclass=abc.ABCMeta): """Return a set of nodes""" pass - @abc.abstractmethod - def PushTaskIns(self, - request: flwr.proto.serverappio_pb2.PushTaskInsRequest, - context: grpc.ServicerContext, - ) -> flwr.proto.serverappio_pb2.PushTaskInsResponse: - """Create one or more tasks""" - pass - @abc.abstractmethod def PushMessages(self, request: flwr.proto.serverappio_pb2.PushInsMessagesRequest, context: grpc.ServicerContext, - ) -> flwr.proto.serverappio_pb2.PushInsMessagesResponse: ... - - @abc.abstractmethod - def PullTaskRes(self, - request: flwr.proto.serverappio_pb2.PullTaskResRequest, - context: grpc.ServicerContext, - ) -> flwr.proto.serverappio_pb2.PullTaskResResponse: - """Get task results""" + ) -> flwr.proto.serverappio_pb2.PushInsMessagesResponse: + """Create one or more messages""" pass @abc.abstractmethod def PullMessages(self, request: flwr.proto.serverappio_pb2.PullResMessagesRequest, context: grpc.ServicerContext, - ) -> flwr.proto.serverappio_pb2.PullResMessagesResponse: ... + ) -> flwr.proto.serverappio_pb2.PullResMessagesResponse: + """Get message results""" + pass @abc.abstractmethod def GetRun(self, diff --git a/src/py/flwr/server/superlink/driver/serverappio_servicer.py b/src/py/flwr/server/superlink/driver/serverappio_servicer.py index 062a69f98b05..910320d50568 100644 --- a/src/py/flwr/server/superlink/driver/serverappio_servicer.py +++ b/src/py/flwr/server/superlink/driver/serverappio_servicer.py @@ -64,14 +64,10 @@ PullResMessagesResponse, PullServerAppInputsRequest, PullServerAppInputsResponse, - PullTaskResRequest, - PullTaskResResponse, PushInsMessagesRequest, PushInsMessagesResponse, PushServerAppOutputsRequest, PushServerAppOutputsResponse, - PushTaskInsRequest, - PushTaskInsResponse, ) from flwr.proto.task_pb2 import TaskRes # pylint: disable=E0611 from flwr.server.superlink.ffs.ffs import Ffs @@ -138,57 +134,6 @@ def CreateRun( ) return CreateRunResponse(run_id=run_id) - def PushTaskIns( - self, request: PushTaskInsRequest, context: grpc.ServicerContext - ) -> PushTaskInsResponse: - """Push a set of TaskIns.""" - log(DEBUG, "ServerAppIoServicer.PushTaskIns") - - # Init state - state: LinkState = self.state_factory.state() - - # Abort if the run is not running - abort_if( - request.run_id, - [Status.PENDING, Status.STARTING, Status.FINISHED], - state, - context, - ) - - # Set pushed_at (timestamp in seconds) - pushed_at = now().timestamp() - for task_ins in request.task_ins_list: - task_ins.task.pushed_at = pushed_at - - # Validate request - _raise_if( - validation_error=len(request.task_ins_list) == 0, - request_name="PushTaskIns", - detail="`task_ins_list` must not be empty", - ) - for task_ins in request.task_ins_list: - validation_errors = validate_task_ins_or_res(task_ins) - _raise_if( - validation_error=bool(validation_errors), - request_name="PushTaskIns", - detail=", ".join(validation_errors), - ) - _raise_if( - validation_error=request.run_id != task_ins.run_id, - request_name="PushTaskIns", - detail="`task_ins` has mismatched `run_id`", - ) - - # Store each TaskIns - task_ids: list[Optional[UUID]] = [] - for task_ins in request.task_ins_list: - task_id: Optional[UUID] = state.store_task_ins(task_ins=task_ins) - task_ids.append(task_id) - - return PushTaskInsResponse( - task_ids=[str(task_id) if task_id else "" for task_id in task_ids] - ) - def PushMessages( self, request: PushInsMessagesRequest, context: grpc.ServicerContext ) -> PushInsMessagesResponse: @@ -242,45 +187,6 @@ def PushMessages( ] ) - def PullTaskRes( - self, request: PullTaskResRequest, context: grpc.ServicerContext - ) -> PullTaskResResponse: - """Pull a set of TaskRes.""" - log(DEBUG, "ServerAppIoServicer.PullTaskRes") - - # Init state - state: LinkState = self.state_factory.state() - - # Abort if the run is not running - abort_if( - request.run_id, - [Status.PENDING, Status.STARTING, Status.FINISHED], - state, - context, - ) - - # Convert each task_id str to UUID - task_ids: set[UUID] = {UUID(task_id) for task_id in request.task_ids} - - # Read from state - task_res_list: list[TaskRes] = state.get_task_res(task_ids=task_ids) - - # Validate request - for task_res in task_res_list: - _raise_if( - validation_error=request.run_id != task_res.run_id, - request_name="PullTaskRes", - detail="`task_res` has mismatched `run_id`", - ) - - # Delete the TaskIns/TaskRes pairs if TaskRes is found - task_ins_ids_to_delete = { - UUID(task_res.task.ancestry[0]) for task_res in task_res_list - } - state.delete_tasks(task_ins_ids=task_ins_ids_to_delete) - - return PullTaskResResponse(task_res_list=task_res_list) - def PullMessages( self, request: PullResMessagesRequest, context: grpc.ServicerContext ) -> PullResMessagesResponse: diff --git a/src/py/flwr/server/superlink/driver/serverappio_servicer_test.py b/src/py/flwr/server/superlink/driver/serverappio_servicer_test.py index fdd824acb70a..226d68cdf1a5 100644 --- a/src/py/flwr/server/superlink/driver/serverappio_servicer_test.py +++ b/src/py/flwr/server/superlink/driver/serverappio_servicer_test.py @@ -31,7 +31,6 @@ from flwr.common.serde_test import RecordMaker from flwr.common.typing import RunStatus from flwr.proto.message_pb2 import Message # pylint: disable=E0611 -from flwr.proto.node_pb2 import Node # pylint: disable=E0611 from flwr.proto.run_pb2 import ( # pylint: disable=E0611 UpdateRunStatusRequest, UpdateRunStatusResponse, @@ -41,24 +40,16 @@ GetNodesResponse, PullResMessagesRequest, PullResMessagesResponse, - PullTaskResRequest, - PullTaskResResponse, PushInsMessagesRequest, PushInsMessagesResponse, PushServerAppOutputsRequest, PushServerAppOutputsResponse, - PushTaskInsRequest, - PushTaskInsResponse, ) -from flwr.proto.task_pb2 import TaskIns # pylint: disable=E0611 from flwr.server.superlink.driver.serverappio_grpc import run_serverappio_api_grpc from flwr.server.superlink.driver.serverappio_servicer import _raise_if from flwr.server.superlink.ffs.ffs_factory import FfsFactory from flwr.server.superlink.linkstate.linkstate_factory import LinkStateFactory -from flwr.server.superlink.linkstate.linkstate_test import ( - create_ins_message, - create_task_ins, -) +from flwr.server.superlink.linkstate.linkstate_test import create_ins_message from flwr.server.superlink.utils import _STATUS_TO_MSG # pylint: disable=broad-except @@ -137,21 +128,11 @@ def setUp(self) -> None: request_serializer=GetNodesRequest.SerializeToString, response_deserializer=GetNodesResponse.FromString, ) - self._push_task_ins = self._channel.unary_unary( - "/flwr.proto.ServerAppIo/PushTaskIns", - request_serializer=PushTaskInsRequest.SerializeToString, - response_deserializer=PushTaskInsResponse.FromString, - ) self._push_messages = self._channel.unary_unary( "/flwr.proto.ServerAppIo/PushMessages", request_serializer=PushInsMessagesRequest.SerializeToString, response_deserializer=PushInsMessagesResponse.FromString, ) - self._pull_task_res = self._channel.unary_unary( - "/flwr.proto.ServerAppIo/PullTaskRes", - request_serializer=PullTaskResRequest.SerializeToString, - response_deserializer=PullTaskResResponse.FromString, - ) self._pull_messages = self._channel.unary_unary( "/flwr.proto.ServerAppIo/PullMessages", request_serializer=PullResMessagesRequest.SerializeToString, @@ -225,24 +206,6 @@ def test_get_nodes_not_successful_if_not_running( # Execute & Assert self._assert_get_nodes_not_allowed(run_id) - def test_successful_push_task_ins_if_running(self) -> None: - """Test `PushTaskIns` success.""" - # Prepare - node_id = self.state.create_node(ping_interval=30) - run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) - task_ins = create_task_ins(consumer_node_id=node_id, run_id=run_id) - - # Transition status to running. PushTaskRes is only allowed in running status. - self._transition_run_status(run_id, 2) - request = PushTaskInsRequest(task_ins_list=[task_ins], run_id=run_id) - - # Execute - response, call = self._push_task_ins.with_call(request=request) - - # Assert - assert isinstance(response, PushTaskInsResponse) - assert grpc.StatusCode.OK == call.code() - def test_successful_push_messages_if_running(self) -> None: """Test `PushMessages` success.""" # Prepare @@ -263,37 +226,6 @@ def test_successful_push_messages_if_running(self) -> None: assert isinstance(response, PushInsMessagesResponse) assert grpc.StatusCode.OK == call.code() - def _assert_push_task_ins_not_allowed(self, task_ins: TaskIns, run_id: int) -> None: - """Assert `PushTaskIns` not allowed.""" - run_status = self.state.get_run_status({run_id})[run_id] - request = PushTaskInsRequest(task_ins_list=[task_ins], run_id=run_id) - - with self.assertRaises(grpc.RpcError) as e: - self._push_task_ins.with_call(request=request) - assert e.exception.code() == grpc.StatusCode.PERMISSION_DENIED - assert e.exception.details() == self.status_to_msg[run_status.status] - - @parameterized.expand( - [ - (0,), # Test not successful if RunStatus is pending. - (1,), # Test not successful if RunStatus is starting. - (3,), # Test not successful if RunStatus is finished. - ] - ) # type: ignore - def test_push_task_ins_not_successful_if_not_running( - self, num_transitions: int - ) -> None: - """Test `PushTaskIns` not successful if RunStatus is not running.""" - # Prepare - node_id = self.state.create_node(ping_interval=30) - run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) - task_ins = create_task_ins(consumer_node_id=node_id, run_id=run_id) - - self._transition_run_status(run_id, num_transitions) - - # Execute & Assert - self._assert_push_task_ins_not_allowed(task_ins, run_id) - def _assert_push_ins_messages_not_allowed( self, message: Message, run_id: int ) -> None: @@ -329,21 +261,6 @@ def test_push_ins_messages_not_successful_if_not_running( # Execute & Assert self._assert_push_ins_messages_not_allowed(message_ins, run_id) - def test_pull_task_res_successful_if_running(self) -> None: - """Test `PullTaskRes` success.""" - # Prepare - run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) - # Transition status to running. PushTaskRes is only allowed in running status. - self._transition_run_status(run_id, 2) - request = PullTaskResRequest(task_ids=[], run_id=run_id) - - # Execute - response, call = self._pull_task_res.with_call(request=request) - - # Assert - assert isinstance(response, PullTaskResResponse) - assert grpc.StatusCode.OK == call.code() - def test_successful_pull_messages_if_running(self) -> None: """Test `PullMessages` success.""" # Prepare @@ -359,35 +276,6 @@ def test_successful_pull_messages_if_running(self) -> None: assert isinstance(response, PullResMessagesResponse) assert grpc.StatusCode.OK == call.code() - def _assert_pull_task_res_not_allowed(self, run_id: int) -> None: - """Assert `PullTaskRes` not allowed.""" - run_status = self.state.get_run_status({run_id})[run_id] - request = PullTaskResRequest(node=Node(node_id=0), run_id=run_id) - - with self.assertRaises(grpc.RpcError) as e: - self._pull_task_res.with_call(request=request) - assert e.exception.code() == grpc.StatusCode.PERMISSION_DENIED - assert e.exception.details() == self.status_to_msg[run_status.status] - - @parameterized.expand( - [ - (0,), # Test not successful if RunStatus is pending. - (1,), # Test not successful if RunStatus is starting. - (3,), # Test not successful if RunStatus is finished. - ] - ) # type: ignore - def test_pull_task_res_not_successful_if_not_running( - self, num_transitions: int - ) -> None: - """Test `PullTaskRes` not successful if RunStatus is not running.""" - # Prepare - run_id = self.state.create_run("", "", "", {}, ConfigsRecord()) - - self._transition_run_status(run_id, num_transitions) - - # Execute & Assert - self._assert_pull_task_res_not_allowed(run_id) - def _assert_pull_messages_not_allowed(self, run_id: int) -> None: """Assert `PullMessages` not allowed.""" run_status = self.state.get_run_status({run_id})[run_id]