diff --git a/Dockerfile b/Dockerfile index ae9ef9ae..d62bc87d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,42 +1,32 @@ -# Base image to start with FROM ubuntu:20.04 - + # Who maintains this DockerFile -MAINTAINER Bart Cox +MAINTAINER Jeroen Galjaard # Run build without interactive dialogue ARG DEBIAN_FRONTEND=noninteractive -# Set required environmental variables for the working setup. +# Set environment variables for GLOO and TP (needed for RPC calls) ENV GLOO_SOCKET_IFNAME=eth0 ENV TP_SOCKET_IFNAME=eth0 # Define the working directory of the current Docker container WORKDIR /opt/federation-lab -# Update the Ubuntu software repository +# Update the Ubuntu software repository and fetch packages RUN apt-get update \ && apt-get install -y vim curl python3 python3-pip net-tools iproute2 -#COPY data/ ./data -#COPY default_models ./default_models -# Copy the current folder to the working directory -ADD setup.py requirements.txt ./ - # Use cache for pip, otherwise we repeatedly pull from repository +ADD setup.py requirements.txt ./ RUN --mount=type=cache,target=/root/.cache/pip python3 -m pip install -r requirements.txt ADD configs configs - ADD fltk fltk +ADD scripts scripts - -# Install newest version of library -RUN python3 -m setup install - -# Expose the container's port to the host OS +# Expose default port 5000 to the host OS. EXPOSE 5000 - # Update relevant runtime configuration for experiment COPY cloud_configs/cloud_experiment.yaml configs/cloud_config.yaml \ No newline at end of file diff --git a/charts/README.md b/charts/README.md index d3885b63..86452ead 100644 --- a/charts/README.md +++ b/charts/README.md @@ -1 +1,2 @@ -This chart was created by Kompose +# FLTK Helm charts + diff --git a/cloud_configs/cloud_experiment.yaml b/cloud_configs/cloud_experiment.yaml deleted file mode 100644 index ec24c498..00000000 --- a/cloud_configs/cloud_experiment.yaml +++ /dev/null @@ -1,35 +0,0 @@ -# Experiment configuration -total_epochs: 130 -epochs_per_cycle: 1 -wait_for_clients: true -net: Cifar10CNN -dataset: cifar10 -sampler: "uniform" -sampler_args: - - 0.5 # p degree - - 42 # random seed -# Use cuda is available; setting to false will force CPU -cuda: false -experiment_prefix: 'experiment_single_machine' -output_location: 'output' -tensor_board_active: true -clients_per_round: 50 -system: - federator: - # Use the SERVICE provided by the fl-server to connect - hostname: 'fl-server.test.svc.cluster.local' - # Default NIC is eth0 - nic: 'eth0' - clients: - amount: 50 -poison: - seed: 420 - ratio: 0.2 - attack: - type: "flip" - config: - - 5: 3 -antidote: - type: "clustering" - f: 0 - k: 1 \ No newline at end of file diff --git a/configs/example_cloud_experiment.json b/configs/example_cloud_experiment.json index 767deaea..11930d48 100644 --- a/configs/example_cloud_experiment.json +++ b/configs/example_cloud_experiment.json @@ -7,13 +7,16 @@ }, "client": { "prefix": "client", - "tensorboard_active": true + "tensorboard_active": false } }, "execution_config": { "experiment_prefix": "cloud_experiment", - "tensorboard_active": true, "cuda": false, + "tensorboard": { + "active": true, + "record_dir": true + }, "net": { "save_model": false, "save_temp_model": false, diff --git a/deploy/templates/client_stub_default.yml b/deploy/templates/client_stub_default.yml deleted file mode 100644 index d65e2624..00000000 --- a/deploy/templates/client_stub_default.yml +++ /dev/null @@ -1,21 +0,0 @@ -client_name: # name can be anything -# container_name: federation-lab-client2 # what the name for this container would be - restart: "no" # if it crashes for example - build: . # look for the docker file where this file is currently located - volumes: - - ./docker_data:/opt/federation-lab/data - - ./default_models:/opt/federation-lab/default_models - - ./data_loaders:/opt/federation-lab/data_loaders - environment: - - PYTHONUNBUFFERED=1 - - RANK={rank} - - WORLD_SIZE={world_size} - ports: - - "5002:5000" # {machine-port}:{docker-port} - depends_on: - - "fl_server" - deploy: - resources: - limits: - cpus: '1.25' - memory: 1024M diff --git a/deploy/templates/client_stub_medium.yml b/deploy/templates/client_stub_medium.yml deleted file mode 100644 index 7083a3b6..00000000 --- a/deploy/templates/client_stub_medium.yml +++ /dev/null @@ -1,21 +0,0 @@ -client_name: # name can be anything -# container_name: federation-lab-client2 # what the name for this container would be - restart: "no" # if it crashes for example - build: . # look for the docker file where this file is currently located - volumes: - - ./docker_data:/opt/federation-lab/data - - ./default_models:/opt/federation-lab/default_models - - ./data_loaders:/opt/federation-lab/data_loaders - environment: - - PYTHONUNBUFFERED=1 - - RANK={rank} - - WORLD_SIZE={world_size} - ports: - - "5002:5000" # {machine-port}:{docker-port} - depends_on: - - "fl_server" - deploy: - resources: - limits: - cpus: '0.75' - memory: 1024M diff --git a/deploy/templates/client_stub_slow.yml b/deploy/templates/client_stub_slow.yml deleted file mode 100644 index 03a3fe48..00000000 --- a/deploy/templates/client_stub_slow.yml +++ /dev/null @@ -1,21 +0,0 @@ -client_name: # name can be anything -# container_name: federation-lab-client2 # what the name for this container would be - restart: "no" # if it crashes for example - build: . # look for the docker file where this file is currently located - volumes: - - ./docker_data:/opt/federation-lab/data - - ./default_models:/opt/federation-lab/default_models - - ./data_loaders:/opt/federation-lab/data_loaders - environment: - - PYTHONUNBUFFERED=1 - - RANK={rank} - - WORLD_SIZE={world_size} - ports: - - "5002:5000" # {machine-port}:{docker-port} - depends_on: - - "fl_server" - deploy: - resources: - limits: - cpus: '0.5' - memory: 1024M diff --git a/deploy/templates/system_stub.yml b/deploy/templates/system_stub.yml deleted file mode 100644 index eda5fc6d..00000000 --- a/deploy/templates/system_stub.yml +++ /dev/null @@ -1,23 +0,0 @@ -# creating a multi-container docker -version: "3.3" -services: - fl_server: # name can be anything - container_name: federation-lab-server # what the name for this container would be - restart: "no" # if it crashes for example - build: . # look for the docker file where this file is currently located - volumes: -# - ./data/MNIST:/opt/federation-lab/data/MNIST - - ./output:/opt/federation-lab/output - environment: - - PYTHONUNBUFFERED=1 - - RANK=0 - - WORLD_SIZE={world_size} - ports: - - "5000:5000" # {machine-port}:{docker-port} - networks: - default: - ipv4_address: 10.5.0.11 -networks: - default: - external: - name: local_network_dev \ No newline at end of file diff --git a/examples/change_world_size_gcp.py b/examples/change_world_size_gcp.py deleted file mode 100644 index 8889b19c..00000000 --- a/examples/change_world_size_gcp.py +++ /dev/null @@ -1,113 +0,0 @@ -import argparse -import os -import time -import googleapiclient.discovery -from googleapiclient.errors import HttpError - -def update_startup_script(compute, project, zone, name_template, rank, world_size, host, nic, region): - instance_name = name_template.format(rank=rank) - startup_script = open( - os.path.join( - os.path.dirname(__file__), 'startup-script_template.sh'), 'r').read() - startup_args = { - 'rank_arg': rank, - 'world_size_arg': world_size, - 'host_arg': host, - 'nic_arg': nic - } - instanceget = compute.instances().get(project=project, zone=zone, instance=instance_name).execute() - - fingerprint = instanceget['metadata']['fingerprint'] - instance_id = instanceget['id'] - # Insert values for startup script in template - startup_script = startup_script.format(**startup_args) - client_body = { - "fingerprint": fingerprint, - "items": [ - { - "key": "startup-script", - "value": startup_script - } - ] - } - print(f'Changing startup script of instance {instance_name}') - return compute.instances().setMetadata( - project=project, - zone=zone, - instance=instance_id, - body=client_body).execute() - -# [START list_instances] -def list_instances(compute, project, zone): - result = compute.instances().list(project=project, zone=zone).execute() - - result2 = compute.machineImages().list(project=project).execute() - print(result2) - return result['items'] if 'items' in result else None -# [END list_instances] - -# [START wait_for_operation] -def wait_for_operation(compute, project, zone, operation): - print('Waiting for operation to finish...') - while True: - result = compute.zoneOperations().get( - project=project, - zone=zone, - operation=operation).execute() - - if result['status'] == 'DONE': - print("done.") - if 'error' in result: - raise Exception(result['error']) - return result - - time.sleep(1) -# [END wait_for_operation] - - - -if __name__ == "__main__": - - parser = argparse.ArgumentParser(description='Change the world-size of VMs in GCP') - parser.add_argument('--num_clients', type=int, default=20, help='The number of clients (excluding the Federator) in the system') - parser.add_argument('--project', type=str, default='tud-federated-learning', help='The Google Cloud Platform project name') - args = parser.parse_args() - - num_clients = args.num_clients - project_name = args.project - - # Change these values if desired - region = 'europe-west4' - zone_name = f'{region}-a' - instance_name='tud-federated-learning-automated-instance' - name_template = 'tud-fl-client-{rank}' - name_template_federator = 'tud-fl-federator-{rank}' - - # The world size is number of clients + 1 - world_size = num_clients + 1 - nic = 'ens4' # Default nic in GCP ubuntu machines - - # Create GCP API instance - compute = googleapiclient.discovery.build('compute', 'beta') - instances = list_instances(compute, project_name, zone_name) - federator_ip = [x['networkInterfaces'][0]['networkIP'] for x in instances if x['name']==name_template_federator.format(rank=0)][0] - host = federator_ip - - ############################ - ## Alter Clients metadata ## - ############################ - operations = [] - for id in range(1, num_clients+1): - try: - operations.append(update_startup_script(compute, project_name, zone_name, name_template, id, world_size, host, nic, region)) - except HttpError as http_error: - if http_error.status_code == 409 and http_error.error_details[0]['reason'] == 'alreadyExists': - print('Resource already exists, continue with the next') - continue - else: - raise http_error - for operation in operations: - wait_for_operation(compute, project_name, zone_name, operation['name']) - - print("""The world-size of the clients are updated""") - diff --git a/examples/deploy.py b/examples/deploy.py deleted file mode 100644 index ae51ff1a..00000000 --- a/examples/deploy.py +++ /dev/null @@ -1,269 +0,0 @@ -import argparse -import os -import time -import googleapiclient.discovery -from googleapiclient.errors import HttpError - - -def create_federator(compute, project, zone, name_template, rank, region, machine_image): - machine_type = f'zones/{zone}/machineTypes/g1-small' - instance_name = name_template.format(rank=rank) - subnetwork = f'projects/{project}/regions/{region}/subnetworks/default' - - print(instance_name) - client_config = { - "kind": "compute#instance", - "name": instance_name, - "zone": zone, - "minCpuPlatform": "Automatic", - "machineType": machine_type, - "displayDevice": { - "enableDisplay": False - }, - "metadata": { - "kind": "compute#metadata", - "items": [], - }, - "tags": { - "items": [ - "http-server", - "https-server" - ] - }, - "canIpForward": False, - "networkInterfaces": [ - { - "kind": "compute#networkInterface", - "subnetwork": subnetwork, - "accessConfigs": [ - { - "kind": "compute#accessConfig", - "name": "External NAT", - "type": "ONE_TO_ONE_NAT", - "networkTier": "PREMIUM" - } - ], - "aliasIpRanges": [] - } - ], - "description": "", - "labels": { - "experiment": "ex-c20" - }, - "scheduling": { - "preemptible": False, - "onHostMaintenance": "MIGRATE", - "automaticRestart": True, - "nodeAffinities": [] - }, - "deletionProtection": False, - "reservationAffinity": { - "consumeReservationType": "ANY_RESERVATION" - }, - "serviceAccounts": [ - { - "email": "default", - "scopes": [ - "https://www.googleapis.com/auth/devstorage.read_only", - "https://www.googleapis.com/auth/logging.write", - "https://www.googleapis.com/auth/monitoring.write", - "https://www.googleapis.com/auth/servicecontrol", - "https://www.googleapis.com/auth/service.management.readonly", - "https://www.googleapis.com/auth/trace.append" - ] - } - ], - "sourceMachineImage": machine_image, - "shieldedInstanceConfig": { - "enableSecureBoot": False, - "enableVtpm": False, - "enableIntegrityMonitoring": True - }, - "confidentialInstanceConfig": { - "enableConfidentialCompute": False - } - } - return compute.instances().insert( - project=project, - zone=zone, - body=client_config).execute() - -def create_client(compute, project, zone, name_template, rank, world_size, host, nic, region, machine_image): - machine_type = f'zones/{zone}/machineTypes/g1-small' - instance_name = name_template.format(rank=rank) - subnetwork = f'projects/{project}/regions/{region}/subnetworks/default' - startup_script = open( - os.path.join( - os.path.dirname(__file__), 'startup-script_template.sh'), 'r').read() - startup_args = { - 'rank_arg': rank, - 'world_size_arg': world_size, - 'host_arg': host, - 'nic_arg': nic - } - startup_script = startup_script.format(**startup_args) - print(instance_name) - - client_config = { - "kind": "compute#instance", - "name": instance_name, - "zone": zone, - "minCpuPlatform": "Automatic", - "machineType": machine_type, - "displayDevice": { - "enableDisplay": False - }, - "metadata": { - "kind": "compute#metadata", - "items": [ - { - # Startup script is automatically executed by the - # instance upon startup. - 'key': 'startup-script', - 'value': startup_script - } - ], - }, - "tags": { - "items": [] - }, - "canIpForward": False, - "networkInterfaces": [ - { - "kind": "compute#networkInterface", - "subnetwork": subnetwork, - "aliasIpRanges": [] - } - ], - "description": "", - "labels": { - "experiment": "ex-c20" - }, - "scheduling": { - "preemptible": False, - "onHostMaintenance": "MIGRATE", - "automaticRestart": True, - "nodeAffinities": [] - }, - "deletionProtection": False, - "reservationAffinity": { - "consumeReservationType": "ANY_RESERVATION" - }, - "serviceAccounts": [ - { - "email": "default", - "scopes": [ - "https://www.googleapis.com/auth/devstorage.read_only", - "https://www.googleapis.com/auth/logging.write", - "https://www.googleapis.com/auth/monitoring.write", - "https://www.googleapis.com/auth/servicecontrol", - "https://www.googleapis.com/auth/service.management.readonly", - "https://www.googleapis.com/auth/trace.append" - ] - } - ], - "sourceMachineImage": machine_image, - "shieldedInstanceConfig": { - "enableSecureBoot": False, - "enableVtpm": False, - "enableIntegrityMonitoring": True - }, - "confidentialInstanceConfig": { - "enableConfidentialCompute": False - } - } - return compute.instances().insert( - project=project, - zone=zone, - body=client_config).execute() - -# [START list_instances] -def list_instances(compute, project, zone): - result = compute.instances().list(project=project, zone=zone).execute() - - result2 = compute.machineImages().list(project=project).execute() - print(result2) - return result['items'] if 'items' in result else None -# [END list_instances] - -# [START wait_for_operation] -def wait_for_operation(compute, project, zone, operation): - print('Waiting for operation to finish...') - while True: - result = compute.zoneOperations().get( - project=project, - zone=zone, - operation=operation).execute() - - if result['status'] == 'DONE': - print("done.") - if 'error' in result: - raise Exception(result['error']) - return result - - time.sleep(1) -# [END wait_for_operation] - - - -if __name__ == "__main__": - - parser = argparse.ArgumentParser(description='Create VMs in GCP for Federated Learning') - parser.add_argument('--num_clients', type=int, default=20, - help='The number of clients (excluding the Federator) in the system') - parser.add_argument('--project', type=str, default='tud-federated-learning', - help='The Google Cloud Platform project name') - parser.add_argument('--machine_image', type=str, default='c20-machine-image', - help='The Google Cloud Platform project name') - args = parser.parse_args() - - num_clients = args.num_clients - project_name = args.project - machine_image_name = args.machine_image - - # Change these values if desired - region = 'europe-west4' - zone_name = f'{region}-a' - instance_name='tud-federated-learning-automated-instance' - name_template = 'tud-fl-client-{rank}' - name_template_federator = 'tud-fl-federator-{rank}' - world_size = num_clients + 1 - nic = 'ens4' # Default nic in GCP ubuntu machines - machine_image = f'projects/{project_name}/global/machineImages/{machine_image_name}' - compute = googleapiclient.discovery.build('compute', 'beta') - - ###################### - ## Create Federator ## - ###################### - try: - federator_operation = create_federator(compute, project_name, zone_name, name_template_federator, 0, region, machine_image) - wait_for_operation(compute, project_name, zone_name, federator_operation['name']) - except HttpError as http_error: - if http_error.status_code == 409 and http_error.error_details[0]['reason'] == 'alreadyExists': - print('Resource already exists, continue with the next') - else: - raise http_error - - instances = list_instances(compute, project_name, zone_name) - federator_ip = [x['networkInterfaces'][0]['networkIP'] for x in instances if x['name']==name_template_federator.format(rank=0)][0] - host = federator_ip - - #################### - ## Create Clients ## - #################### - operations = [] - for id in range(1, num_clients+1): - try: - operations.append(create_client(compute, project_name, zone_name, name_template, id, world_size, host, nic, region, machine_image)) - wait_for_operation(compute, project_name, zone_name, operations[-1]['name']) - except HttpError as http_error: - if http_error.status_code == 409 and http_error.error_details[0]['reason'] == 'alreadyExists': - print('Resource already exists, continue with the next') - continue - else: - raise http_error - for operation in operations: - wait_for_operation(compute, project_name, zone_name, operation['name']) - - print("""Now login via ssh into the federator VM and start the experiment.""") - diff --git a/examples/startup-script_template.sh b/examples/startup-script_template.sh deleted file mode 100644 index 2a709325..00000000 --- a/examples/startup-script_template.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -python3 -m fltk remote --rank={rank_arg} --world_size={world_size_arg} --host={host_arg} --nic={nic_arg} \ No newline at end of file diff --git a/fltk/__main__.py b/fltk/__main__.py index e32b682c..b993464f 100644 --- a/fltk/__main__.py +++ b/fltk/__main__.py @@ -44,7 +44,7 @@ def main(): exit(-1) if arguments.mode == 'orchestrator': - start_clusterized(arguments, config) + cluster_start(arguments, config) elif arguments.mode == 'client': run_single() @@ -68,7 +68,7 @@ def perform_single_experiment(args, cfg, parser, yaml_data): run_single(rank=args.rank, world_size=world_size, host=master_address, args=cfg, nic=nic) -def start_clusterized(args: dict, config: BareConfig): +def cluster_start(args: dict, config: BareConfig): """ Function to start poisoned experiment. """ @@ -84,8 +84,6 @@ def start_clusterized(args: dict, config: BareConfig): pool.join() - - print(f'rank={args.rank}, world_size={world_size}, host={master_address}, args=cfg, nic={nic}') run_single(rank=args.rank, args=config, nic=nic) diff --git a/fltk/client.py b/fltk/client.py index f4f6434e..999e6abb 100644 --- a/fltk/client.py +++ b/fltk/client.py @@ -3,7 +3,6 @@ import gc import logging import os -import random import time import traceback @@ -41,19 +40,18 @@ def _remote_method_async(method, rref, *args, **kwargs): args = [method, rref] + list(args) return rpc.rpc_async(rref.owner(), _call_method, args=args, kwargs=kwargs) + class Client: - counter = 0 finished_init = False dataset = None epoch_counter = 0 + net: torch.nn.Module - def __init__(self, id, log_rref, rank, world_size, config: BareConfig = None): + def __init__(self, id, log_rref, config: BareConfig = None): logging.info(f'Welcome to client {id}') - self.net: torch.nn.Module = None + self.id = id self.log_rref = log_rref - self.rank = rank - self.world_size = world_size self.args = config self.args.init_logger(logging) @@ -93,15 +91,6 @@ def ping(self): """ return 'pong' - def rpc_test(self): - sleep_time = random.randint(1, 5) - time.sleep(sleep_time) - self.local_log(f'sleep for {sleep_time} seconds') - self.counter += 1 - log_line = f'Number of times called: {self.counter}' - self.local_log(log_line) - self.remote_log(log_line) - def remote_log(self, message): _remote_method_async(DistLearningLogger.log, self.log_rref, self.id, message, time.time()) diff --git a/fltk/datasets/__init__.py b/fltk/datasets/__init__.py index 12141278..e0abbd66 100644 --- a/fltk/datasets/__init__.py +++ b/fltk/datasets/__init__.py @@ -1 +1,4 @@ from .distributed import * +from .cifar10 import CIFAR10Dataset +from .cifar100 import CIFAR100Dataset +from .fashion_mnist import FashionMNISTDataset \ No newline at end of file diff --git a/fltk/datasets/data_distribution/iid_equal.py b/fltk/datasets/data_distribution/iid_equal.py index c47bcc16..05ab1a4b 100644 --- a/fltk/datasets/data_distribution/iid_equal.py +++ b/fltk/datasets/data_distribution/iid_equal.py @@ -1,5 +1,3 @@ -import torch - def distribute_batches_equally(train_data_loader, num_workers): """ Gives each worker the same number of batches of training data. @@ -9,7 +7,7 @@ def distribute_batches_equally(train_data_loader, num_workers): :param num_workers: number of workers :type num_workers: int """ - distributed_dataset = [[] for i in range(num_workers)] + distributed_dataset = [[] for _ in range(num_workers)] for batch_idx, (data, target) in enumerate(train_data_loader): worker_idx = batch_idx % num_workers diff --git a/fltk/launch.py b/fltk/launch.py index 8058719e..3f591e03 100644 --- a/fltk/launch.py +++ b/fltk/launch.py @@ -1,21 +1,15 @@ -import os -import torch.distributed.rpc as rpc import logging +import os - +import torch.distributed.rpc as rpc import torch.multiprocessing as mp -from fltk.orchestrator import Orchestrator +from fltk.orchestrator import run_ps from fltk.util.env.learner_environment import prepare_environment logging.basicConfig(level=logging.INFO) -def run_ps(rpc_ids_triple, args): - print(f'Starting the federator...') - fed = Orchestrator(rpc_ids_triple, config=args) - fed.run() - def await_assigned_orchestrator(): # TODO: Implement await function for client @@ -28,7 +22,9 @@ def await_assigned_orchestrator(): 6. Terminate/complete pod execution. """ pass -def run_single(rank, world_size, host = None, args = None, nic = None): + + +def run_single(rank, world_size, host=None, args=None, nic=None): logging.info(f'Starting with rank={rank} and world size={world_size}') prepare_environment(host, nic) @@ -60,6 +56,7 @@ def run_single(rank, world_size, host = None, args = None, nic = None): # block until all rpc finish rpc.shutdown() + # def run_single(rank, world_size, host = None, args = None, nic = None): def run_spawn(config): @@ -70,4 +67,4 @@ def run_spawn(config): args=(world_size, master_address, config), nprocs=world_size, join=True - ) \ No newline at end of file + ) diff --git a/fltk/orchestrator.py b/fltk/orchestrator.py index 7c5b4369..d98916df 100644 --- a/fltk/orchestrator.py +++ b/fltk/orchestrator.py @@ -348,3 +348,9 @@ def distribute_new_model(self, updated_model) -> None: for res in responses: res[1].wait() logging.info('Weights are updated') + + +def run_ps(rpc_ids_triple, args): + print(f'Starting the federator...') + fed = Orchestrator(rpc_ids_triple, config=args) + fed.run() \ No newline at end of file diff --git a/fltk/util/data_loader_utils.py b/fltk/util/data_loader_utils.py index d01cf3a4..af58449c 100644 --- a/fltk/util/data_loader_utils.py +++ b/fltk/util/data_loader_utils.py @@ -1,10 +1,12 @@ -import numpy -from torch.utils.data import DataLoader - import os import pickle import random -from ..datasets.distributed import DistDataset as Dataset + +import numpy +from torch.utils.data import DataLoader + +from fltk.datasets.dataset import Dataset + def generate_data_loaders_from_distributed_dataset(distributed_dataset, batch_size): """ @@ -17,10 +19,13 @@ def generate_data_loaders_from_distributed_dataset(distributed_dataset, batch_si """ data_loaders = [] for worker_training_data in distributed_dataset: - data_loaders.append(Dataset.get_data_loader_from_data(batch_size, worker_training_data[0], worker_training_data[1], shuffle=True)) + data_loaders.append( + Dataset.get_data_loader_from_data(batch_size, worker_training_data[0], worker_training_data[1], + shuffle=True)) return data_loaders + def load_train_data_loader(logger, args): """ Loads the training data DataLoader object from a file if available. @@ -36,12 +41,14 @@ def load_train_data_loader(logger, args): raise FileNotFoundError("Couldn't find train data loader stored in file") + def generate_train_loader(args, dataset): train_dataset = dataset.get_train_dataset() X, Y = shuffle_data(args, train_dataset) return dataset.get_data_loader_from_data(args.get_batch_size(), X, Y) + def load_test_data_loader(logger, args): """ Loads the test data DataLoader object from a file if available. @@ -56,6 +63,7 @@ def load_test_data_loader(logger, args): raise FileNotFoundError("Couldn't find train data loader stored in file") + def load_data_loader_from_file(logger, filename) -> DataLoader: """ Loads DataLoader object from a file if available. @@ -68,12 +76,14 @@ def load_data_loader_from_file(logger, filename) -> DataLoader: with open(filename, "rb") as f: return load_saved_data_loader(f) + def generate_test_loader(args, dataset): test_dataset = dataset.get_test_dataset() X, Y = shuffle_data(args, test_dataset) return dataset.get_data_loader_from_data(args.get_test_batch_size(), X, Y) + def shuffle_data(args, dataset): data = list(zip(dataset[0], dataset[1])) random.shuffle(data) @@ -83,8 +93,10 @@ def shuffle_data(args, dataset): return X, Y + def load_saved_data_loader(file_obj): return pickle.load(file_obj) + def save_data_loader_to_file(data_loader, file_obj): pickle.dump(data_loader, file_obj) diff --git a/fltk/util/default_models.py b/fltk/util/default_models.py index eda04fb3..6ed3cb1a 100644 --- a/fltk/util/default_models.py +++ b/fltk/util/default_models.py @@ -1,8 +1,8 @@ - +import logging import os + import torch -import logging -logging.basicConfig(level=logging.DEBUG) + from fltk.nets import Cifar10CNN, FashionMNISTCNN, Cifar100ResNet, FashionMNISTResNet, Cifar10ResNet, Cifar100VGG from fltk.util.arguments import Arguments @@ -44,4 +44,4 @@ # ----------- Cifar100VGG --------- # --------------------------------- full_save_path = os.path.join(args.get_default_model_folder_path(), "Cifar100VGG.model") - torch.save(Cifar100VGG().state_dict(), full_save_path) \ No newline at end of file + torch.save(Cifar100VGG().state_dict(), full_save_path) diff --git a/fltk/util/generate_data_distribution.py b/fltk/util/generate_data_distribution.py index 34135c0b..70e70db3 100644 --- a/fltk/util/generate_data_distribution.py +++ b/fltk/util/generate_data_distribution.py @@ -2,7 +2,7 @@ import os import logging -from fltk.datasets import CIFAR10Dataset, FashionMNISTDataset, CIFAR100Dataset +from fltk.datasets.cifar10 import CIFAR10Dataset from fltk.util.arguments import Arguments from fltk.util.data_loader_utils import generate_train_loader, generate_test_loader, save_data_loader_to_file diff --git a/fltk/util/generate_docker_compose.py b/fltk/util/generate_docker_compose.py index 52a7bb36..f8b8c168 100644 --- a/fltk/util/generate_docker_compose.py +++ b/fltk/util/generate_docker_compose.py @@ -1,19 +1,23 @@ +import copy import sys + import yaml -import copy template_path = './deploy/templates' + def load_system_template(): with open(f'{template_path}/system_stub.yml') as file: documents = yaml.full_load(file) return documents + def load_client_template(type='default'): with open(f'{template_path}/client_stub_{type}.yml') as file: documents = yaml.full_load(file) return documents + def generate_client(id, template: dict, world_size: int, type='default'): local_template = copy.deepcopy(template) key_name = list(local_template.keys())[0] @@ -25,24 +29,24 @@ def generate_client(id, template: dict, world_size: int, type='default'): if item == 'WORLD_SIZE={world_size}': local_template[container_name]['environment'][key] = item.format(world_size=world_size) - local_template[container_name]['ports'] = [f'{5000+id}:5000'] + local_template[container_name]['ports'] = [f'{5000 + id}:5000'] return local_template, container_name def generate(num_clients: int): world_size = num_clients + 1 - system_template :dict = load_system_template() + system_template: dict = load_system_template() for key, item in enumerate(system_template['services']['fl_server']['environment']): if item == 'WORLD_SIZE={world_size}': system_template['services']['fl_server']['environment'][key] = item.format(world_size=world_size) - for client_id in range(1, num_clients+1): + for client_id in range(1, num_clients + 1): client_type = 'default' if client_id == 1: - client_type='slow' + client_type = 'slow' if client_id == 2: - client_type='medium' + client_type = 'medium' client_template: dict = load_client_template(type=client_type) client_definition, container_name = generate_client(client_id, client_template, world_size, type=client_type) system_template['services'].update(client_definition) @@ -52,8 +56,6 @@ def generate(num_clients: int): if __name__ == '__main__': - num_clients = int(sys.argv[1]) generate(num_clients) print('Done') - diff --git a/fltk/util/iid_equal.py b/fltk/util/iid_equal.py index c47bcc16..0c13362b 100644 --- a/fltk/util/iid_equal.py +++ b/fltk/util/iid_equal.py @@ -1,5 +1,3 @@ -import torch - def distribute_batches_equally(train_data_loader, num_workers): """ Gives each worker the same number of batches of training data. diff --git a/fltk/util/log.py b/fltk/util/log.py index 52379999..e3296fc3 100644 --- a/fltk/util/log.py +++ b/fltk/util/log.py @@ -2,8 +2,9 @@ from torch.distributed import rpc + class DistLearningLogger: @staticmethod @rpc.functions.async_execution def log(arg1, node_id, log_line, report_time): - logging.info(f'[{node_id}: {report_time}]: {log_line}') \ No newline at end of file + logging.info(f'[{node_id}: {report_time}]: {log_line}') diff --git a/fltk/util/tensor_converter.py b/fltk/util/tensor_converter.py index f5f7abee..1a0e2634 100644 --- a/fltk/util/tensor_converter.py +++ b/fltk/util/tensor_converter.py @@ -1,5 +1,6 @@ import numpy + def convert_distributed_data_into_numpy(distributed_dataset): """ Converts a distributed dataset (returned by a data distribution method) from Tensors into numpy arrays. diff --git a/scripts/install.sh b/scripts/install.sh deleted file mode 100644 index c7c2cc95..00000000 --- a/scripts/install.sh +++ /dev/null @@ -1,12 +0,0 @@ -#!/usr/bin/env bash - -# CD-ing into the correct directory -cd /home/$USER/fltk-testbed-gr-30 - -# Pull most recent changes -git pull - -# Install the changes -sudo python3 setup.py install - - diff --git a/setup.py b/setup.py index b3acafe3..e549d27a 100644 --- a/setup.py +++ b/setup.py @@ -2,9 +2,9 @@ setup( name="fltk", author="Bart Cox", - author_email="b.a.cox@tudelft.nl", - maintainer="Bart Cox", - maintainer_email="b.a.cox@tudelft.nl", + author_email="B.A.Cox@tudelft.nl", + maintainer="Jeroen Martijn Galjaard", + maintainer_email="J.M.Galjaard-1@student.tudelft.nl", description="Federated Learning Toolkit", packages=find_packages(), version='0.3.1',