From 2d038c99732b6b50f8b0ccab8cbdc0d67bc6317f Mon Sep 17 00:00:00 2001 From: porteratzo Date: Mon, 29 Jan 2024 07:51:31 -0800 Subject: [PATCH] remove max concurency, add number of actors --- ...nterface_201_Exclusive_GPUs_with_Ray.ipynb | 2 -- openfl/experimental/runtime/local_runtime.py | 25 +++++++++---------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb b/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb index a7db92fd78..34e7adcb4c 100644 --- a/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb +++ b/openfl-tutorials/experimental/Workflow_Interface_201_Exclusive_GPUs_with_Ray.ipynb @@ -335,8 +335,6 @@ "# The following is equivalent to\n", "# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, **backend='ray'**)\n", "local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')\n", - "# Use the following for ray_grouped backend\n", - "#local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray_grouped')\n", "print(f'Local runtime collaborators = {local_runtime.collaborators}')" ] }, diff --git a/openfl/experimental/runtime/local_runtime.py b/openfl/experimental/runtime/local_runtime.py index cb31ca1423..f5635aea23 100644 --- a/openfl/experimental/runtime/local_runtime.py +++ b/openfl/experimental/runtime/local_runtime.py @@ -11,6 +11,7 @@ import gc from openfl.experimental.runtime import Runtime from typing import TYPE_CHECKING, Optional +import math if TYPE_CHECKING: from openfl.experimental.interface import Aggregator, Collaborator, FLSpec @@ -66,14 +67,13 @@ def ray_call_get(self) -> List[Any]: return clones -def ray_group_assign(collaborators, collaborators_per_group=3, max_concurrency=1): +def ray_group_assign(collaborators, number_of_actors=3): """ Assigns collaborators to resource groups which share a CUDA context. Args: collaborators (list): The list of collaborators. - collaborators_per_group (int, optional): The number of collaborators per group. Defaults to 3. - max_concurrency (int, optional): The maximum concurrency. Defaults to 1. + number_of_actors (int, optional): Number of actors to distribute collaborators to. Defaults to 3. Returns: list: A list of GroupMember instances. @@ -151,7 +151,7 @@ def remote(self, *args, **kwargs): return self.f(*args, *kwargs) collaborator_ray_refs = [] - + collaborators_per_group = math.ceil(len(collaborators)/number_of_actors) times_called = 0 # logic to sort collaborators by gpus, if collaborators have the same number of gpu then they are sorted by cpu cpu_magnitude = len(str(abs(max([i.num_cpus for i in collaborators])))) @@ -168,7 +168,7 @@ def remote(self, *args, **kwargs): print(f'creating actor with {max_num_cpus}, {max_num_gpus}') collaborator_actor = ( ray.remote(RayGroup) - .options(num_cpus=max_num_cpus, num_gpus=max_num_gpus, max_concurrency=max_concurrency) + .options(num_cpus=max_num_cpus, num_gpus=max_num_gpus)#, max_concurrency=max_concurrency) .remote() ) # add collaborator to actor group @@ -296,14 +296,13 @@ def some_collaborator_task(self): f"Invalid 'backend' value '{backend}', accepted values are " + "'ray', or 'single_process'" ) - if backend.find("ray") != -1: + if backend == "ray": if not ray.is_initialized(): dh = kwargs.get("dashboard_host", "127.0.0.1") dp = kwargs.get("dashboard_port", 5252) ray.init(dashboard_host=dh, dashboard_port=dp) - self.collaborators_per_group = kwargs.get('collaborators_per_group', 3) - self.max_concurrency = kwargs.get('max_concurrency', 1) + self.number_of_actors = kwargs.get('number_of_actors', 3) self.backend = backend if aggregator is not None: self.aggregator = self.__get_aggregator_object(aggregator) @@ -373,7 +372,7 @@ def __get_collaborator_object(self, collaborators: List) -> Any: ) if self.backend == "ray": - collaborator_ray_refs = ray_group_assign(collaborators, collaborators_per_group=self.collaborators_per_group, max_concurrency=self.max_concurrency) + collaborator_ray_refs = ray_group_assign(collaborators, number_of_actors=self.number_of_actors) return collaborator_ray_refs @property @@ -528,7 +527,7 @@ def execute_agg_task(self, flspec_obj, f): clones = [FLSpec._clones[col] for col in self.selected_collaborators] self.join_step = False - if self.backend.find("ray") != -1: + if self.backend == "ray": ray_executor = RayExecutor() ray_executor.ray_call_put( aggregator, flspec_obj, f.__name__, self.execute_agg_steps, clones @@ -575,7 +574,7 @@ def execute_collab_task( # filter exclude/include attributes for clone self.filter_exclude_include(flspec_obj, f, selected_collaborators, **kwargs) - if self.backend.find("ray") != -1: + if self.backend == "ray": ray_executor = RayExecutor() # set runtime,collab private attributes and metaflowinterface for col in selected_collaborators: @@ -593,14 +592,14 @@ def execute_collab_task( clone = FLSpec._clones[collab_name] collaborator = self.__collaborators[collab_name] - if self.backend.find("ray") != -1: + if self.backend == "ray": ray_executor.ray_call_put( collaborator, clone, f.__name__, self.execute_collab_steps ) else: collaborator.execute_func(clone, f.__name__, self.execute_collab_steps) - if self.backend.find("ray") != -1: + if self.backend == "ray": clones = ray_executor.ray_call_get() FLSpec._clones.update(zip(selected_collaborators, clones)) clone = clones[0]