Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing flake8 to make CI green on Ubuntu #1113

Merged
merged 1 commit into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions openfl-workspace/torch_template/src/dataloader.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# Copyright (C) 2024 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.
# Licensed subject to the terms of the separately executed evaluation license agreement between
# Intel Corporation and you.

from openfl.federated import PyTorchDataLoader


class TemplateDataLoader(PyTorchDataLoader):
"""Template dataloader for PyTorch.
This class should be used as a template to create a custom DataLoader for your specific dataset.
Expand All @@ -12,14 +14,18 @@ class TemplateDataLoader(PyTorchDataLoader):
The `plan.yaml` modifications should be done under the `<workspace>/plan/plan.yaml` section:
```
data_loader:
defaults : plan/defaults/data_loader.yaml
template: src.dataloader.TemplateDataLoader # Modify this line appropriately if you change the class name
defaults: plan/defaults/data_loader.yaml
template: src.dataloader.TemplateDataLoader # Modify this line appropriately if you
change the class name
settings:
# Add additional arguments (such as batch_size) that you wish to pass through `def __init__():`
# Add additional arguments (such as batch_size) that you wish
# to pass through `def __init__():`
# You do not need to pass in data_path here. It will be set by the collaborators
```
`batch_size` is passed to the `super().`__init__` method to ensure that the superclass is properly initialized with the specified batch size.
After calling `super().__init__`, define `self.X_train`, `self.y_train`, `self.X_valid`, and `self.y_valid`.
`batch_size` is passed to the `super().`__init__` method to ensure that the superclass
is properly initialized with the specified batch size.
After calling `super().__init__`, define `self.X_train`, `self.y_train`,
`self.X_valid`, and `self.y_valid`.
"""

def __init__(self, data_path, batch_size, **kwargs):
Expand All @@ -40,6 +46,7 @@ def __init__(self, data_path, batch_size, **kwargs):
self.X_valid = X_valid
self.y_valid = y_valid


def load_dataset(data_path, **kwargs):
"""
Load your dataset here.
Expand All @@ -65,4 +72,5 @@ def load_dataset(data_path, **kwargs):
return X_train, y_train, X_valid, y_valid


raise NotImplementedError("Use <workspace>/src/dataloader.py template to create a custom dataloader. Then remove this line.")
raise NotImplementedError("Use <workspace>/src/dataloader.py template to "
"create a custom dataloader. Then remove this line.")
21 changes: 14 additions & 7 deletions openfl-workspace/torch_template/src/taskrunner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Copyright (C) 2024 Intel Corporation
# Licensed subject to the terms of the separately executed evaluation license agreement between Intel Corporation and you.
# Licensed subject to the terms of the separately executed evaluation license agreement between
# Intel Corporation and you.

import numpy as np

Expand All @@ -8,20 +9,25 @@
from openfl.federated import PyTorchTaskRunner
from openfl.utilities import Metric


class TemplateTaskRunner(PyTorchTaskRunner):
"""Template Task Runner for PyTorch.

This class should be used as a template to create a custom Task Runner for your specific model and training workload.
This class should be used as a template to create a custom Task Runner for your specific model
and training workload.
After generating this template, you should:
1. Define your model, optimizer, and loss function as you would in PyTorch. PyTorchTaskRunner inherits from torch.nn.Module.
2. Implement the `train_` and `validate_` functions to define a single train and validate epoch of your workload.
1. Define your model, optimizer, and loss function as you would in PyTorch.
PyTorchTaskRunner inherits from torch.nn.Module.
2. Implement the `train_` and `validate_` functions to define a single train
and validate epoch of your workload.
3. Modify the `plan.yaml` file to use this Task Runner.

The `plan.yaml` modifications should be done under the `<workspace>/plan/plan.yaml` section:
```
task_runner:
defaults : plan/defaults/task_runner.yaml
template: src.taskrunner.TemplateTaskRunner # Modify this line appropriately if you change the class name
template: src.taskrunner.TemplateTaskRunner # Modify this line appropriately
if you change the class name
settings:
# Add additional arguments that you wish to pass through `__init__`
```
Expand Down Expand Up @@ -86,8 +92,8 @@ def validate_(
"""Single validation epoch.

Args:
validation_dataloader: Validation dataset batch generator. Yields (samples, targets) tuples.
of size = `self.validation_dataloader.batch_size`.
validation_dataloader: Validation dataset batch generator. Yields (samples, targets)
tuples of size = `self.validation_dataloader.batch_size`.

Returns:
Metric: An object containing the name of the metric and its value as an np.ndarray.
Expand All @@ -102,6 +108,7 @@ def validate_(
accuracy = None # Placeholder for accuracy calculation.
return Metric(name="accuracy", value=np.array(accuracy))


raise NotImplementedError(
"Use <workspace>/src/taskrunner.py template to create a custom Task Runner "
"with your model definition and training/validation logic. Then remove this line."
Expand Down
2 changes: 1 addition & 1 deletion tests/github/interactive_api_director/experiment_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def run_federation(shards: typing.Dict[str, Shard], director_path: str):
logger.info('Starting the experiment!')
running_processes = []
p = subprocess.Popen(
f"fx director start --disable-tls",
"fx director start --disable-tls",
shell=True,
cwd=os.path.join(director_path)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as tsf

from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.data_loader import read_data
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.data_loader import read_data # noqa: E501
from openfl.interface.interactive_api.experiment import DataInterface


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
from openfl.interface.interactive_api.shard_descriptor import ShardDataset
from openfl.interface.interactive_api.shard_descriptor import ShardDescriptor
from openfl.utilities import validate_file_hash
from zipfile import ZipFile
import requests


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import PIL
import torch.optim as optim
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.model import UNet
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.tasks import task_interface
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.tasks import task_interface # noqa: E501
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.tasks import validate
from torch.utils.data import DataLoader
from torch.utils.data import Dataset
Expand Down Expand Up @@ -37,7 +37,8 @@ def run():
optimizer_adam = optim.Adam(model_unet.parameters(), lr=1e-4)

framework_adapter = 'openfl.plugins.frameworks_adapters.pytorch_adapter.FrameworkAdapterPlugin'
MI = ModelInterface(model=model_unet, optimizer=optimizer_adam, framework_plugin=framework_adapter)
MI = ModelInterface(model=model_unet, optimizer=optimizer_adam,
framework_plugin=framework_adapter)

# Save the initial model state
initial_model = deepcopy(model_unet)
Expand All @@ -60,7 +61,8 @@ def run():


# Register dataset
# We extract User dataset class implementation. Is it convinient? What if the dataset is not a class?
# We extract User dataset class implementation.
# Is it convinient? What if the dataset is not a class?
class KvasirShardDataset(Dataset):

def __init__(self, dataset):
Expand Down Expand Up @@ -113,7 +115,8 @@ def shard_descriptor(self, shard_descriptor):
validation_size = max(1, int(len(self._shard_dataset) * self.validation_fraction))

self.train_indeces = np.arange(len(self._shard_dataset) - validation_size)
self.val_indeces = np.arange(len(self._shard_dataset) - validation_size, len(self._shard_dataset))
self.val_indeces = np.arange(len(self._shard_dataset) - validation_size,
len(self._shard_dataset))

def get_train_loader(self, **kwargs):
"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import torch
import torch.nn as nn

from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.layers import DoubleConv, Down, Up
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.layers import DoubleConv, Down, Up # noqa: E501

"""
UNet model definition
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import numpy as np

from openfl.interface.interactive_api.experiment import TaskInterface
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.layers import soft_dice_loss, soft_dice_coef
from tests.github.interactive_api_director.experiments.pytorch_kvasir_unet.layers import soft_dice_loss, soft_dice_coef # noqa: E501


task_interface = TaskInterface()
Expand Down Expand Up @@ -58,8 +58,8 @@ def validate(unet_model, val_loader, device):
for data, target in val_loader:
samples = target.shape[0]
total_samples += samples
data, target = torch.tensor(data).to(device), \
torch.tensor(target).to(device, dtype=torch.int64)
data = torch.tensor(data).to(device)
target = torch.tensor(target).to(device, dtype=torch.int64)
output = unet_model(data)
val = soft_dice_coef(output, target)
val_score += val.sum().cpu().numpy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ def shard_descriptor(self, shard_descriptor):
self._shard_descriptor = shard_descriptor
validation_size = len(self.shard_descriptor) // 10
self.train_indices = np.arange(len(self.shard_descriptor) - validation_size)
self.val_indices = np.arange(len(self.shard_descriptor) - validation_size, len(self.shard_descriptor))
self.val_indices = np.arange(len(self.shard_descriptor) - validation_size,
len(self.shard_descriptor))

def get_train_loader(self, **kwargs):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ def __init__(self, rank_worldsize: str = '1,1') -> None:
self.X_test = x_test[self.rank - 1::self.worldsize]
self.y_test = y_test[self.rank - 1::self.worldsize]


# Calculating data and target shapes
sample, _ = self[0]
self._sample_shape = [str(dim) for dim in sample.shape]
self._target_shape = ['0']


def __getitem__(self, index):
"""Return a item by the index."""
if index < len(self.X_train):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,37 @@
import tensorflow as tf
# Create a federation
from openfl.interface.interactive_api.federation import Federation
from openfl.interface.interactive_api.experiment import TaskInterface, DataInterface, ModelInterface, FLExperiment
from openfl.interface.interactive_api.experiment import TaskInterface, ModelInterface, FLExperiment
from tests.github.interactive_api_director.experiments.tensorflow_mnist.dataset import FedDataset
from tests.github.interactive_api_director.experiments.tensorflow_mnist.settings import model
from tests.github.interactive_api_director.experiments.tensorflow_mnist.settings import optimizer
from tests.github.interactive_api_director.experiments.tensorflow_mnist.settings import loss_fn
from tests.github.interactive_api_director.experiments.tensorflow_mnist.settings import train_acc_metric
from tests.github.interactive_api_director.experiments.tensorflow_mnist.settings import val_acc_metric
from tests.github.interactive_api_director.experiments.tensorflow_mnist.envoy.shard_descriptor import MNISTShardDescriptor
from tests.github.interactive_api_director.experiments.tensorflow_mnist.settings import train_acc_metric # noqa: E501
from tests.github.interactive_api_director.experiments.tensorflow_mnist.settings import val_acc_metric # noqa: E501
from tests.github.interactive_api_director.experiments.tensorflow_mnist.envoy.shard_descriptor import MNISTShardDescriptor # noqa: E501


def run():
# please use the same identificator that was used in signed certificate
client_id = 'frontend'

# 1) Run with API layer - Director mTLS
# If the user wants to enable mTLS their must provide CA root chain, and signed key pair to the federation interface
# If the user wants to enable mTLS their must provide CA root chain,
# and signed key pair to the federation interface
# cert_chain = 'cert/root_ca.crt'
# API_certificate = 'cert/frontend.crt'
# API_private_key = 'cert/frontend.key'

# federation = Federation(client_id='frontend', director_node_fqdn='localhost', director_port='50051',
# cert_chain=cert_chain, api_cert=API_certificate, api_private_key=API_private_key)
# federation = Federation(client_id='frontend', director_node_fqdn='localhost',
# director_port='50051', cert_chain=cert_chain,
# api_cert=API_certificate, api_private_key=API_private_key)

# --------------------------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------

# 2) Run with TLS disabled (trusted environment)
# Federation can also determine local fqdn automatically
federation = Federation(client_id=client_id, director_node_fqdn='localhost', director_port='50051', tls=False)
federation = Federation(client_id=client_id, director_node_fqdn='localhost',
director_port='50051', tls=False)

shard_registry = federation.get_shard_registry()
print(shard_registry)
Expand All @@ -40,19 +43,17 @@ def run():
for sample in samples:
print(sample.shape)


framework_adapter = 'openfl.plugins.frameworks_adapters.keras_adapter.FrameworkAdapterPlugin'
MI = ModelInterface(model=model, optimizer=optimizer, framework_plugin=framework_adapter)


def function_defined_in_notebook(some_parameter):
print(f'Also I accept a parameter and it is {some_parameter}')


TI = TaskInterface()

# Task interface currently supports only standalone functions.
@TI.register_fl_task(model='model', data_loader='train_dataset',
device='device', optimizer='optimizer')
device='device', optimizer='optimizer')
def train(model, train_dataset, optimizer, device, loss_fn=loss_fn, warmup=False):

# Iterate over the batches of the dataset.
Expand Down Expand Up @@ -85,7 +86,6 @@ def train(model, train_dataset, optimizer, device, loss_fn=loss_fn, warmup=False

return {'train_acc': train_acc}


@TI.register_fl_task(model='model', data_loader='val_dataset', device='device')
def validate(model, val_dataset, device):
# Run a validation loop at the end of each epoch.
Expand All @@ -97,26 +97,23 @@ def validate(model, val_dataset, device):
val_acc_metric.reset_states()
print("Validation acc: %.4f" % (float(val_acc),))

return {'validation_accuracy': val_acc,}
return {'validation_accuracy': val_acc, }
# Save the initial model state
train(model,fed_dataset.get_train_loader(), optimizer, 'cpu', warmup=True)
train(model, fed_dataset.get_train_loader(), optimizer, 'cpu', warmup=True)
initial_model = tf.keras.models.clone_model(model)



# The Interactive API supports registering functions definied in main module or imported.


# create an experimnet in federation
experiment_name = 'mnist_test_experiment'
fl_experiment = FLExperiment(
federation=federation,
experiment_name=experiment_name,
serializer_plugin='openfl.plugins.interface_serializer.'
'keras_serializer.KerasSerializer')
fl_experiment = FLExperiment(federation=federation,
experiment_name=experiment_name,
serializer_plugin='openfl.plugins.interface_serializer.'
'keras_serializer.KerasSerializer')
# If I use autoreload I got a pickling error

# The following command zips the workspace and python requirements to be transfered to collaborator nodes
# The following command zips the workspace and python requirements
# to be transfered to collaborator nodes
fl_experiment.start(model_provider=MI,
task_keeper=TI,
data_loader=fed_dataset,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
Expand Down
1 change: 1 addition & 0 deletions tests/github/pki_wrong_cn.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import openfl.native as fx
from openfl.utilities.utils import getfqdn_env


def prepare_workspace():
subprocess.check_call(['fx', 'workspace', 'certify'])
subprocess.check_call(['fx', 'plan', 'initialize'])
Expand Down
1 change: 1 addition & 0 deletions tests/github/test_gandlf.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from tests.github.utils import create_collaborator, certify_aggregator
from openfl.utilities.utils import getfqdn_env


def exec(command, directory):
os.chdir(directory)
check_call(command)
Expand Down
Loading