Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change opt treatment and device policy to enum values #374

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions openfl-tutorials/Federated_FedProx_Keras_MNIST_Tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,8 @@
"outputs": [],
"source": [
"#Run experiment, return trained FederatedModel\n",
"final_fl_model = fx.run_experiment(collaborators,override_config={'aggregator.settings.rounds_to_train':5, 'collaborator.settings.opt_treatment': 'CONTINUE_GLOBAL'})"
"from openfl.utilities.enum_types import OptTreatment\n",
"final_fl_model = fx.run_experiment(collaborators,override_config={'aggregator.settings.rounds_to_train':5, 'collaborator.settings.opt_treatment': OptTreatment.CONTINUE_GLOBAL})"
]
},
{
Expand Down Expand Up @@ -354,7 +355,7 @@
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
Expand All @@ -368,7 +369,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
"version": "3.8.9"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please clear the notebook metadata

}
},
"nbformat": 4,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,13 +429,15 @@
"outputs": [],
"source": [
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(\n",
" model_provider=MI,\n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=10,\n",
" opt_treatment=\"CONTINUE_GLOBAL\",\n",
" device_assignment_policy=\"CUDA_PREFERRED\",\n",
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED,\n",
")"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,12 +510,14 @@
"outputs": [],
"source": [
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(model_provider=MI,\n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=5,\n",
" opt_treatment='CONTINUE_GLOBAL',\n",
" device_assignment_policy='CUDA_PREFERRED')"
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED)"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,12 +463,15 @@
"outputs": [],
"source": [
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(\n",
" model_provider=model_interface, \n",
" task_keeper=task_interface,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=5,\n",
" opt_treatment='CONTINUE_GLOBAL'\n",
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED\n",
")"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,12 +474,15 @@
"outputs": [],
"source": [
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(\n",
" model_provider=model_interface, \n",
" task_keeper=task_interface,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=5,\n",
" opt_treatment='CONTINUE_GLOBAL'\n",
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED\n",
")"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,13 +438,15 @@
"metadata": {},
"outputs": [],
"source": [
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(\n",
" model_provider=MI,\n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=2,\n",
" opt_treatment=\"CONTINUE_GLOBAL\",\n",
" device_assignment_policy=\"CUDA_PREFERRED\",\n",
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED,\n",
")"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,12 +484,14 @@
"# If I use autoreload I got a pickling error\n",
"\n",
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please split this import into separate lines

"\n",
"fl_experiment.start(model_provider=MI, \n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=2,\n",
" opt_treatment='CONTINUE_GLOBAL',\n",
" device_assignment_policy='CUDA_PREFERRED')\n"
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED)\n"
]
},
{
Expand Down Expand Up @@ -584,7 +586,7 @@
"source": [
"MI = ModelInterface(model=best_model, optimizer=optimizer_adam, framework_plugin=framework_adapter)\n",
"fl_experiment.start(model_provider=MI, task_keeper=TI, data_loader=fed_dataset, rounds_to_train=4, \\\n",
" opt_treatment='CONTINUE_GLOBAL')"
" opt_treatment=OptTreatment.CONTINUE_GLOBAL)"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,13 +596,15 @@
"metadata": {},
"outputs": [],
"source": [
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(\n",
" model_provider=MI,\n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=10,\n",
" opt_treatment=\"CONTINUE_GLOBAL\",\n",
" device_assignment_policy=\"CUDA_PREFERRED\",\n",
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED,\n",
")"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -965,12 +965,14 @@
"# If I use autoreload I got a pickling error\n",
"\n",
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(model_provider=MI, \n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=10,\n",
" opt_treatment='CONTINUE_GLOBAL',\n",
" device_assignment_policy='CUDA_PREFERRED')\n"
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED)\n"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,11 +543,14 @@
"# If I use autoreload I got a pickling error\n",
"\n",
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(model_provider=MI, \n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=3,\n",
" opt_treatment='RESET')"
" opt_treatment=OptTreatment.RESET,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED)"
]
},
{
Expand Down Expand Up @@ -590,4 +593,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,15 @@
"outputs": [],
"source": [
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(\n",
" model_provider=model_interface, \n",
" task_keeper=task_interface,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=5,\n",
" opt_treatment='CONTINUE_GLOBAL'\n",
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED\n",
")"
]
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,11 +360,14 @@
"outputs": [],
"source": [
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(model_provider=MI, \n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=5,\n",
" opt_treatment='CONTINUE_GLOBAL')"
" opt_treatment=OptTreatment.CONTINUE_GLOBAL,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED)"
]
},
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,14 @@
"# If I use autoreload I got a pickling error\n",
"\n",
"# The following command zips the workspace and python requirements to be transfered to collaborator nodes\n",
"from openfl.utilities.enum_types import DevicePolicy, OptTreatment\n",
"\n",
"fl_experiment.start(model_provider=MI, \n",
" task_keeper=TI,\n",
" data_loader=fed_dataset,\n",
" rounds_to_train=20,\n",
" opt_treatment='RESET')"
" opt_treatment=OptTreatment.RESET,\n",
" device_assignment_policy=DevicePolicy.CUDA_PREFERRED)"
]
},
{
Expand Down
56 changes: 9 additions & 47 deletions openfl/component/collaborator/collaborator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

"""Collaborator module."""

from enum import Enum
from logging import getLogger
from time import sleep
from typing import Tuple
Expand All @@ -12,35 +11,11 @@
from openfl.pipelines import NoCompressionPipeline
from openfl.pipelines import TensorCodec
from openfl.protocols import utils
from openfl.utilities.enum_types import DevicePolicy
from openfl.utilities.enum_types import OptTreatment
from openfl.utilities import TensorKey


class DevicePolicy(Enum):
"""Device assignment policy."""

CPU_ONLY = 1

CUDA_PREFERRED = 2


class OptTreatment(Enum):
"""Optimizer Methods.

- RESET tells each collaborator to reset the optimizer state at the beginning
of each round.

- CONTINUE_LOCAL tells each collaborator to continue with the local optimizer
state from the previous round.

- CONTINUE_GLOBAL tells each collaborator to continue with the federally
averaged optimizer state from the previous round.
"""

RESET = 1
CONTINUE_LOCAL = 2
CONTINUE_GLOBAL = 3


class Collaborator:
r"""The Collaborator object class.

Expand All @@ -49,8 +24,8 @@ class Collaborator:
aggregator_uuid: The unique id for the client
federation_uuid: The unique id for the federation
model: The model
opt_treatment* (string): The optimizer state treatment (Defaults to
"CONTINUE_GLOBAL", which is aggreagated state from previous round.)
opt_treatment* (enum.Enum): The optimizer state treatment (Defaults to
OptTreatment.CONTINUE_GLOBAL, which is aggreagated state from previous round.)

compression_pipeline: The compression pipeline (Defaults to None)

Expand All @@ -74,8 +49,8 @@ def __init__(self,
client,
task_runner,
task_config,
opt_treatment='RESET',
device_assignment_policy='CPU_ONLY',
opt_treatment=OptTreatment.RESET,
device_assignment_policy=DevicePolicy.CPU_ONLY,
delta_updates=False,
compression_pipeline=None,
db_store_rounds=1,
Expand Down Expand Up @@ -105,23 +80,10 @@ def __init__(self,

self.logger = getLogger(__name__)

# RESET/CONTINUE_LOCAL/CONTINUE_GLOBAL
if hasattr(OptTreatment, opt_treatment):
self.opt_treatment = OptTreatment[opt_treatment]
else:
self.logger.error(f'Unknown opt_treatment: {opt_treatment.name}.')
raise NotImplementedError(f'Unknown opt_treatment: {opt_treatment}.')

if hasattr(DevicePolicy, device_assignment_policy):
self.device_assignment_policy = DevicePolicy[device_assignment_policy]
else:
self.logger.error('Unknown device_assignment_policy: '
f'{device_assignment_policy.name}.')
raise NotImplementedError(
f'Unknown device_assignment_policy: {device_assignment_policy}.'
)
self.opt_treatment = opt_treatment
self.device_assignment_policy = device_assignment_policy

self.task_runner.set_optimizer_treatment(self.opt_treatment.name)
self.task_runner.set_optimizer_treatment(self.opt_treatment)

def set_available_devices(self, cuda: Tuple[str] = ()):
"""
Expand Down
11 changes: 11 additions & 0 deletions openfl/federated/plan/plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from openfl.interface.cli_helper import WORKSPACE
from openfl.transport import AggregatorGRPCClient
from openfl.transport import AggregatorGRPCServer
from openfl.utilities.enum_types import OptTreatment
from openfl.utilities.utils import getfqdn_env

SETTINGS = 'settings'
Expand Down Expand Up @@ -458,6 +459,16 @@ def get_collaborator(self, collaborator_name, root_certificate=None, private_key

defaults[SETTINGS]['compression_pipeline'] = self.get_tensor_pipe()
defaults[SETTINGS]['task_config'] = self.config.get('tasks', {})

opt_treatment = defaults[SETTINGS]['opt_treatment']
if isinstance(opt_treatment, str) and hasattr(OptTreatment, opt_treatment):
defaults[SETTINGS]['opt_treatment'] = OptTreatment[opt_treatment].value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to save string here to easeir review plan.yaml.
In experiment it is using plan.config['collaborator']['settings']['opt_treatment'] = opt_treatment.name.
We should save values in plan in the same format.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I and @igor-davidyuk discussed about it and thought of using value instead of name

elif isinstance(opt_treatment, int) and OptTreatment(opt_treatment):
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we parse integer value here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if opt_treatment is already an integer (from lime 463), then it's already parsed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this check then?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So both integer and string values are appropriate here if there are multiple collaborators. In both of these conditions, it shouldn't raise an exception but for all other cases it should.

else:
self.logger.error(f'Unknown opt_treatment: {opt_treatment}.')
raise NotImplementedError(f'Unknown opt_treatment: {opt_treatment}.')

if client is not None:
defaults[SETTINGS]['client'] = client
else:
Expand Down
3 changes: 2 additions & 1 deletion openfl/federated/task/runner_fe.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np

from openfl.utilities.enum_types import OptTreatment
from openfl.utilities import split_tensor_dict_for_holdouts
from openfl.utilities import TensorKey
from .runner import TaskRunner
Expand Down Expand Up @@ -148,7 +149,7 @@ def train(self, col_name, round_num, input_tensor_dict, epochs, **kwargs):
# A work around could involve doing a single epoch of training
# on random data to get the optimizer names, and then throwing away
# the model.
if self.opt_treatment == 'CONTINUE_GLOBAL':
if self.opt_treatment == OptTreatment.CONTINUE_GLOBAL:
self.initialize_tensorkeys_for_functions(with_opt_vars=True)

return global_tensor_dict, local_tensor_dict
Expand Down
Loading