Skip to content

Commit

Permalink
remove max concurency, add number of actors
Browse files Browse the repository at this point in the history
  • Loading branch information
porteratzo committed Jan 29, 2024
1 parent 6f5725f commit 2d038c9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@
"# The following is equivalent to\n",
"# local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, **backend='ray'**)\n",
"local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray')\n",
"# Use the following for ray_grouped backend\n",
"#local_runtime = LocalRuntime(aggregator=aggregator, collaborators=collaborators, backend='ray_grouped')\n",
"print(f'Local runtime collaborators = {local_runtime.collaborators}')"
]
},
Expand Down
25 changes: 12 additions & 13 deletions openfl/experimental/runtime/local_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import gc
from openfl.experimental.runtime import Runtime
from typing import TYPE_CHECKING, Optional
import math

if TYPE_CHECKING:
from openfl.experimental.interface import Aggregator, Collaborator, FLSpec
Expand Down Expand Up @@ -66,14 +67,13 @@ def ray_call_get(self) -> List[Any]:
return clones


def ray_group_assign(collaborators, collaborators_per_group=3, max_concurrency=1):
def ray_group_assign(collaborators, number_of_actors=3):
"""
Assigns collaborators to resource groups which share a CUDA context.
Args:
collaborators (list): The list of collaborators.
collaborators_per_group (int, optional): The number of collaborators per group. Defaults to 3.
max_concurrency (int, optional): The maximum concurrency. Defaults to 1.
number_of_actors (int, optional): Number of actors to distribute collaborators to. Defaults to 3.
Returns:
list: A list of GroupMember instances.
Expand Down Expand Up @@ -151,7 +151,7 @@ def remote(self, *args, **kwargs):
return self.f(*args, *kwargs)

collaborator_ray_refs = []

collaborators_per_group = math.ceil(len(collaborators)/number_of_actors)
times_called = 0
# logic to sort collaborators by gpus, if collaborators have the same number of gpu then they are sorted by cpu
cpu_magnitude = len(str(abs(max([i.num_cpus for i in collaborators]))))
Expand All @@ -168,7 +168,7 @@ def remote(self, *args, **kwargs):
print(f'creating actor with {max_num_cpus}, {max_num_gpus}')
collaborator_actor = (
ray.remote(RayGroup)
.options(num_cpus=max_num_cpus, num_gpus=max_num_gpus, max_concurrency=max_concurrency)
.options(num_cpus=max_num_cpus, num_gpus=max_num_gpus)#, max_concurrency=max_concurrency)
.remote()
)
# add collaborator to actor group
Expand Down Expand Up @@ -296,14 +296,13 @@ def some_collaborator_task(self):
f"Invalid 'backend' value '{backend}', accepted values are "
+ "'ray', or 'single_process'"
)
if backend.find("ray") != -1:
if backend == "ray":
if not ray.is_initialized():
dh = kwargs.get("dashboard_host", "127.0.0.1")
dp = kwargs.get("dashboard_port", 5252)
ray.init(dashboard_host=dh, dashboard_port=dp)

self.collaborators_per_group = kwargs.get('collaborators_per_group', 3)
self.max_concurrency = kwargs.get('max_concurrency', 1)
self.number_of_actors = kwargs.get('number_of_actors', 3)
self.backend = backend
if aggregator is not None:
self.aggregator = self.__get_aggregator_object(aggregator)
Expand Down Expand Up @@ -373,7 +372,7 @@ def __get_collaborator_object(self, collaborators: List) -> Any:
)

if self.backend == "ray":
collaborator_ray_refs = ray_group_assign(collaborators, collaborators_per_group=self.collaborators_per_group, max_concurrency=self.max_concurrency)
collaborator_ray_refs = ray_group_assign(collaborators, number_of_actors=self.number_of_actors)
return collaborator_ray_refs

@property
Expand Down Expand Up @@ -528,7 +527,7 @@ def execute_agg_task(self, flspec_obj, f):
clones = [FLSpec._clones[col] for col in self.selected_collaborators]
self.join_step = False

if self.backend.find("ray") != -1:
if self.backend == "ray":
ray_executor = RayExecutor()
ray_executor.ray_call_put(
aggregator, flspec_obj, f.__name__, self.execute_agg_steps, clones
Expand Down Expand Up @@ -575,7 +574,7 @@ def execute_collab_task(
# filter exclude/include attributes for clone
self.filter_exclude_include(flspec_obj, f, selected_collaborators, **kwargs)

if self.backend.find("ray") != -1:
if self.backend == "ray":
ray_executor = RayExecutor()
# set runtime,collab private attributes and metaflowinterface
for col in selected_collaborators:
Expand All @@ -593,14 +592,14 @@ def execute_collab_task(
clone = FLSpec._clones[collab_name]
collaborator = self.__collaborators[collab_name]

if self.backend.find("ray") != -1:
if self.backend == "ray":
ray_executor.ray_call_put(
collaborator, clone, f.__name__, self.execute_collab_steps
)
else:
collaborator.execute_func(clone, f.__name__, self.execute_collab_steps)

if self.backend.find("ray") != -1:
if self.backend == "ray":
clones = ray_executor.ray_call_get()
FLSpec._clones.update(zip(selected_collaborators, clones))
clone = clones[0]
Expand Down

0 comments on commit 2d038c9

Please sign in to comment.