Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run collaborator in docker #280

Closed
Show file tree
Hide file tree
Changes from 52 commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
a0935f0
wip
dmitryagapov Dec 8, 2021
6a5651f
wip
dmitryagapov Dec 16, 2021
dacd4f1
wip
dmitryagapov Dec 16, 2021
f2afcf9
refactoring
dmitryagapov Dec 23, 2021
52b8284
add aiodocker requirements
dmitryagapov Dec 23, 2021
add5710
refactoring
dmitryagapov Jan 10, 2022
20e318e
refactoring
dmitryagapov Jan 11, 2022
96d9c7d
refactoring
dmitryagapov Jan 12, 2022
517494d
refactoring
dmitryagapov Jan 12, 2022
b6c1102
Merge branch 'develop' into feature/run_collaborator_in_docker
dmitryagapov Jan 12, 2022
51db0d0
refactoring
dmitryagapov Jan 12, 2022
05c18c1
refactoring
dmitryagapov Jan 12, 2022
bde6023
refactoring
dmitryagapov Jan 12, 2022
f3ed426
create docker module
dmitryagapov Jan 19, 2022
7e65c40
refactoring
dmitryagapov Feb 2, 2022
aee9f57
Director aggregator communication by rpc (#334)
aleksandr-mokrov Feb 10, 2022
0dd2c7c
Merge branch 'develop' into feature/run_collaborator_in_docker
dmitryagapov Feb 16, 2022
9321627
add --use_docker to envoy
dmitryagapov Feb 16, 2022
7b9c625
Merge branch 'dockerezation-launch' into feature/run_collaborator_in_…
dmitryagapov Feb 16, 2022
41c8399
fix flake8
dmitryagapov Feb 16, 2022
a414cfb
add openfl.docker module to packages
dmitryagapov Feb 16, 2022
c9d0223
fix initial tensor path
dmitryagapov Feb 16, 2022
baa5be6
merge fix
dmitryagapov Feb 17, 2022
1cc73eb
add --use-docker flag for envoy
dmitryagapov Feb 17, 2022
898a22e
add --use-docker flag for director
dmitryagapov Feb 17, 2022
493eb96
fix
dmitryagapov Feb 17, 2022
3f907c1
Merge branch 'develop' into feature/run_collaborator_in_docker
dmitryagapov Mar 1, 2022
fe749ca
merge
dmitryagapov Mar 1, 2022
f5c4aac
fix
dmitryagapov Mar 3, 2022
bd3b31d
fix
dmitryagapov Mar 3, 2022
f3df364
fix
dmitryagapov Mar 3, 2022
96d5ffe
fix kvasir url
dmitryagapov Mar 9, 2022
33efbcc
fix
dmitryagapov Mar 9, 2022
c90322e
Merge remote-tracking branch 'openfl/develop' into develop
dmitryagapov Mar 10, 2022
82de52f
Merge branch 'develop' into feature/run_collaborator_in_docker
dmitryagapov Mar 10, 2022
9ffc06e
add docker proxy for director and envoy configs
dmitryagapov Mar 15, 2022
04a387e
add docker proxy for director and envoy configs
dmitryagapov Mar 16, 2022
c5457d0
fix
dmitryagapov Mar 18, 2022
814bc85
add buildargs config to envoy/director configs
dmitryagapov Mar 18, 2022
4923046
add buildargs config to envoy/director configs
dmitryagapov Mar 18, 2022
c335c78
docker config
dmitryagapov Mar 23, 2022
488054f
Merge branch 'develop' of github.com:intel/openfl into develop
dmitryagapov Mar 23, 2022
c98455f
Merge branch 'develop' into dockerezation-launch
dmitryagapov Mar 23, 2022
03ad397
Merge branch 'develop' into feature/run_collaborator_in_docker
dmitryagapov Mar 23, 2022
160b94c
Merge branch 'dockerezation-launch' into feature/run_collaborator_in_…
dmitryagapov Mar 23, 2022
b24b995
refactoring
dmitryagapov Mar 23, 2022
4d510fc
fixes
dmitryagapov Mar 25, 2022
e9a5a1c
fixes
dmitryagapov Mar 25, 2022
cfc178a
add volumes for PyTorch_Kvasir_UNet
dmitryagapov Mar 25, 2022
6e481be
fix
dmitryagapov Mar 25, 2022
425bc1a
send only one model to aggregator when last == best
dmitryagapov Mar 31, 2022
adf79c5
relative import to absolute
dmitryagapov Mar 31, 2022
79cf99a
fixes
dmitryagapov Apr 4, 2022
9d9a968
Diagrams
dmitryagapov Apr 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions openfl-docker/Dockerfile.aggregator
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
FROM python:3.8

RUN pip install --upgrade pip
RUN pip install git+https://github.com/dmitryagapov/openfl.git@feature/run_collaborator_in_docker

COPY . /code
WORKDIR /code
11 changes: 11 additions & 0 deletions openfl-docker/Dockerfile.collaborator
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM python:3.8

RUN pip install --upgrade pip
RUN pip install git+https://github.com/dmitryagapov/openfl.git@feature/run_collaborator_in_docker

WORKDIR /code
COPY ./requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

fx director start --disable-tls -c director_config.yaml --use-docker
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#!/bin/bash
set -e
ENVOY_NAME=$1
SHARD_CONF=$2

fx envoy start -n "$ENVOY_NAME" --disable-tls --envoy-config-path "$SHARD_CONF" -dh localhost -dp 50051
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
params:
cuda_devices: [0]
cuda_devices: [ 0 ]

optional_plugin_components:
cuda_device_monitor:
template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
settings: []
cuda_device_monitor:
template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
settings: [ ]

shard_descriptor:
template: dogs_cats_shard_descriptor.DogsCatsShardDescriptor
volumes:
- '~/.kaggle/kaggle.json'
- './data'
params:
data_folder: data
rank_worldsize: 1,2
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
settings:
listen_host: localhost
listen_port: 50050
sample_shape: ['300', '400', '3']
target_shape: ['300', '400']
sample_shape: [ '300', '400', '3' ]
target_shape: [ '300', '400' ]
envoy_health_check_period: 5 # in seconds
docker:
env:
http_proxy:
https_proxy:
no_proxy:
buildargs:
HTTP_PROXY:
HTTPS_PROXY:
NO_PROXY:


Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

fx director start --disable-tls -c director_config.yaml --use-docker
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
params:
cuda_devices: [0,2]

cuda_devices: [ 0, 2 ]
docker:
env:
http_proxy:
https_rpoxy:
no_proxy:
buildargs:
HTTP_PROXY:
HTTPS_PROXY:
NO_PROXY:
volumes:
- './kvasir_data'

optional_plugin_components:
cuda_device_monitor:
template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
settings: []
cuda_device_monitor:
template: openfl.plugins.processing_units_monitor.pynvml_monitor.PynvmlCUDADeviceMonitor
settings: [ ]

shard_descriptor:
template: kvasir_shard_descriptor.KvasirShardDescriptor
params:
data_folder: kvasir_data
rank_worldsize: 1,10
enforce_image_hw: '300,400'


Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
params:
cuda_devices: []
cuda_devices: [ ]
docker_env:
http_proxy:
https_rpoxy:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spelling

no_proxy:

optional_plugin_components: {}
optional_plugin_components: { }

shard_descriptor:
template: kvasir_shard_descriptor.KvasirShardDescriptor
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash
set -e

fx envoy start -n env_one --disable-tls --envoy-config-path envoy_config.yaml -dh localhost -dp 50050 --use-docker
159 changes: 132 additions & 27 deletions openfl/component/aggregator/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
"""Aggregator module."""
import queue
from logging import getLogger
from pathlib import Path
from typing import List
from typing import Union

from openfl.component.aggregation_functions import WeightedAverage
from openfl.databases import TensorDB
from openfl.pipelines import NoCompressionPipeline
from openfl.pipelines import TensorCodec
from openfl.protocols import base_pb2
from openfl.protocols import utils
from openfl.transport.grpc.director_client import DirectorClient
from openfl.utilities import TaskResultKey
from openfl.utilities import TensorKey
from openfl.utilities.logs import write_metric
Expand All @@ -32,24 +36,33 @@ class Aggregator:
\* - plan setting.
"""

def __init__(self,

aggregator_uuid,
federation_uuid,
authorized_cols,

init_state_path,
best_state_path,
last_state_path,

assigner,

rounds_to_train=256,
single_col_cert_common_name=None,
compression_pipeline=None,
db_store_rounds=1,
write_logs=False,
**kwargs):
def __init__(
self,

aggregator_uuid,
federation_uuid,
authorized_cols,

init_state_path,
best_state_path,
last_state_path,

assigner,

director_host,
director_port,
experiment_name: str = None,
rounds_to_train=256,
single_col_cert_common_name=None,
compression_pipeline=None,
db_store_rounds=1,
write_logs=False,
tls: bool = False,
root_certificate: Union[Path, str] = None,
private_key: Union[Path, str] = None,
certificate: Union[Path, str] = None,
**kwargs
) -> None:
"""Initialize."""
self.round_number = 0
self.single_col_cert_common_name = single_col_cert_common_name
Expand Down Expand Up @@ -109,6 +122,17 @@ def __init__(self,
self.collaborator_tasks_results = {} # {TaskResultKey: list of TensorKeys}

self.collaborator_task_weight = {} # {TaskResultKey: data_size}
self.experiment_name = experiment_name

self.director_client = DirectorClient(
client_id=f'aggregator_{experiment_name}',
director_host=director_host,
director_port=director_port,
tls=tls,
root_certificate=root_certificate,
private_key=private_key,
certificate=certificate,
)

def _load_initial_tensors(self):
"""
Expand Down Expand Up @@ -516,7 +540,8 @@ def send_local_task_results(self, collaborator_name, round_number, task_name,
'task_name': task_name,
'metric_name': tensor_key.tensor_name,
'metric_value': metric_value,
'round': round_number}
'round': round_number
}
if self.write_logs:
self.log_metric(tensor_key.tags[-1], task_name,
tensor_key.tensor_name, nparray, round_number)
Expand All @@ -537,6 +562,55 @@ def send_local_task_results(self, collaborator_name, round_number, task_name,

self._end_of_task_check(task_name)

def get_experiment_description(self) -> dict:
"""Get a experiment information by name for specific user."""
progress = self.round_number / self.rounds_to_train
model_statuses = self.model_download_statuses
tasks = [{
'name': task['function'],
'description': 'Task description Mock',
} for task in self.assigner.tasks.values()]
collaborators = [{
'name': name,
'status': 'pending_mock',
'progress': 0.0,
'round': 0,
'current_task': 'Current Task Mock',
'next_task': 'Next Task Mock'
} for name in self.authorized_cols]
result = {
'current_round': self.round_number,
'total_rounds': self.rounds_to_train,
'download_statuses': {
'models': model_statuses,
'logs': [{
'name': 'aggregator',
'status': 'ready'
}],
},
'collaborators': collaborators,
'tasks': tasks,
'progress': progress
}
return result

@property
def model_download_statuses(self) -> List[dict]:
"""Return model download statuses representation."""
best_model_status = 'ready' if self.best_tensor_dict else 'pending'
last_model_status = 'ready' if self.last_tensor_dict else 'pending'
model_statuses = [{
'name': 'best',
'status': best_model_status,
}, {
'name': 'last',
'status': last_model_status,
}, {
'name': 'init',
'status': 'ready'
}]
return model_statuses

def _process_named_tensor(self, named_tensor, collaborator_name):
"""
Extract the named tensor fields.
Expand All @@ -557,10 +631,14 @@ def _process_named_tensor(self, named_tensor, collaborator_name):
The numpy array associated with the returned tensorkey
"""
raw_bytes = named_tensor.data_bytes
metadata = [{'int_to_float': proto.int_to_float,
'int_list': proto.int_list,
'bool_list': proto.bool_list}
for proto in named_tensor.transformer_metadata]
metadata = [
{
'int_to_float': proto.int_to_float,
'int_list': proto.int_list,
'bool_list': proto.bool_list,
}
for proto in named_tensor.transformer_metadata
]
# The tensor has already been transfered to aggregator,
# so the newly constructed tensor should have the aggregator origin
tensor_key = TensorKey(
Expand Down Expand Up @@ -743,14 +821,15 @@ def _prepare_trained(self, tensor_name, origin, round_number, report, agg_result
# Finally, cache the updated model tensor
self.tensor_db.cache_tensor({final_model_tk: new_model_nparray})

def _compute_validation_related_task_metrics(self, task_name):
def _compute_validation_related_task_metrics(self, task_name) -> bool:
"""
Compute all validation related metrics.

Args:
task_name : str
The task name to compute
"""
is_best_model = False
# By default, print out all of the metrics that the validation
# task sent
# This handles getting the subset of collaborators that may be
Expand Down Expand Up @@ -795,7 +874,8 @@ def _compute_validation_related_task_metrics(self, task_name):
'task_name': task_name,
'metric_name': tensor_key.tensor_name,
'metric_value': agg_results.item(),
'round': round_number}
'round': round_number,
}

if agg_results is None:
self.logger.warning(
Expand All @@ -822,8 +902,26 @@ def _compute_validation_related_task_metrics(self, task_name):
f'model with score {agg_results:f}')
self.best_model_score = agg_results
self._save_model(round_number, self.best_state_path)
is_best_model = True
# self.upload_model_to_aggregator(model_type='best')
if 'trained' in tags:
self._prepare_trained(tensor_name, origin, round_number, report, agg_results)
return is_best_model

def upload_model_to_aggregator(self, model_type: str = 'last'):
if model_type not in ['last', 'best']:
raise ValueError(
f'Invalid {model_type=} for upload_model_to_aggregator function. '
f'Allowed values "last", "best"'
)
model_proto = utils.construct_model_proto(
self.last_tensor_dict, 0, NoCompressionPipeline()
)
self.director_client.upload_experiment_model(
experiment_name=self.experiment_name,
model_proto=model_proto,
model_type=model_type
)

def _end_of_round_check(self):
"""
Expand All @@ -844,15 +942,20 @@ def _end_of_round_check(self):

# Compute all validation related metrics
all_tasks = self.assigner.get_all_tasks_for_round(self.round_number)
is_best_model = False
for task_name in all_tasks:
self._compute_validation_related_task_metrics(task_name)
_is_best_model = self._compute_validation_related_task_metrics(task_name)
if _is_best_model:
is_best_model = True

# Once all of the task results have been processed
self.round_number += 1

# Save the latest model
self.logger.info(f'Saving round {self.round_number} model...')
self._save_model(self.round_number, self.last_state_path)
model_type = 'best' if is_best_model else 'last'
self.upload_model_to_aggregator(model_type=model_type)

# TODO This needs to be fixed!
if self._time_to_quit():
Expand Down Expand Up @@ -893,7 +996,9 @@ def _log_big_warning(self):
def stop(self, failed_collaborator: str = None) -> None:
"""Stop aggregator execution."""
self.logger.info('Force stopping the aggregator execution.')
for collaborator_name in filter(lambda c: c != failed_collaborator, self.authorized_cols):

for collaborator_name in filter(lambda c: c != failed_collaborator,
self.authorized_cols):
self.logger.info(f'Sending signal to collaborator {collaborator_name} to shutdown...')
self.quit_job_sent_to.append(collaborator_name)

Expand Down
Loading