Skip to content

Commit

Permalink
Enable offloading
Browse files Browse the repository at this point in the history
  • Loading branch information
bacox committed Jan 12, 2022
1 parent 872d206 commit 16a4e91
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 98 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ EXPOSE 5000
COPY fltk ./fltk
COPY configs ./configs
#CMD python3 ./fltk/__main__.py single configs/experiment.yaml --rank=$RANK
CMD python3 -m fltk single configs/experiment.yaml --rank=$RANK
CMD python3 -m fltk single configs/experiment_vanilla.yaml --rank=$RANK
#CMD python3 setup.py
9 changes: 6 additions & 3 deletions configs/experiment.yaml
Original file line number Diff line number Diff line change
@@ -1,18 +1,21 @@
---
# Experiment configuration
total_epochs: 4
total_epochs: 30
epochs_per_cycle: 1
wait_for_clients: true
net: Cifar10CNN
dataset: cifar10
# Use cuda is available; setting to false will force CPU
cuda: false
experiment_prefix: 'experiment_sample'
offload_stategy: vanilla
profiling_time: 100
deadline: 500
output_location: 'output'
tensor_board_active: true
clients_per_round: 2
# sampler: "dirichlet" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
sampler: "uniform" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
sampler: "dirichlet" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
#sampler: "uniform" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
sampler_args:
- 0.07 # label limit || q probability || alpha || unused
- 42 # random seed || random seed || random seed || unused
Expand Down
10 changes: 6 additions & 4 deletions configs/experiment_vanilla.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
---
# Experiment configuration
total_epochs: 4
total_epochs: 20
epochs_per_cycle: 1
wait_for_clients: true
net: Cifar10CNN
dataset: cifar10
# Use cuda is available; setting to false will force CPU
cuda: false
experiment_prefix: 'offloading_vanilla'
experiment_prefix: 'exp_offload_vanilla'
offload_stategy: vanilla
profiling_time: 100
deadline: 500
output_location: 'output'
tensor_board_active: true
clients_per_round: 2
# sampler: "dirichlet" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
sampler: "uniform" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
sampler: "dirichlet" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
#sampler: "uniform" # "limit labels" || "q sampler" || "dirichlet" || "uniform" (default)
sampler_args:
- 0.07 # label limit || q probability || alpha || unused
- 42 # random seed || random seed || random seed || unused
Expand Down
2 changes: 1 addition & 1 deletion deploy/templates/client_stub_default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ client_name: # name can be anything
resources:
limits:
cpus: '2'
memory: 1024M
# memory: 1024M
2 changes: 1 addition & 1 deletion deploy/templates/client_stub_medium.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@ client_name: # name can be anything
deploy:
resources:
limits:
cpus: '0.75'
cpus: '1'
memory: 1024M
185 changes: 125 additions & 60 deletions fltk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from torch.distributed.rpc import RRef

from fltk.schedulers import MinCapableStepLR
from fltk.strategy.offloading import OffloadingStrategy
from fltk.util.arguments import Arguments
from fltk.util.fed_avg import average_nn_parameters
from fltk.util.log import FLLogger
Expand Down Expand Up @@ -68,6 +69,8 @@ class Client:
call_to_offload = False
client_to_offload_to : str = None

strategy = OffloadingStrategy.VANILLA


def __init__(self, id, log_rref, rank, world_size, config = None):
logging.info(f'Welcome to client {id}')
Expand All @@ -92,6 +95,43 @@ def __init__(self, id, log_rref, rank, world_size, config = None):
self.args.get_scheduler_step_size(),
self.args.get_scheduler_gamma(),
self.args.get_min_lr())
self.strategy = OffloadingStrategy.Parse(config.offload_strategy)
self.configure_strategy(self.strategy)


def configure_strategy(self, strategy : OffloadingStrategy):
if strategy == OffloadingStrategy.VANILLA:
logging.info('Running with offloading strategy: VANILLA')
self.deadline_enabled = False
self.swyh_enabled = False
self.freeze_layers_enabled = False
self.offload_enabled = False
if strategy == OffloadingStrategy.DEADLINE:
logging.info('Running with offloading strategy: DEADLINE')
self.deadline_enabled = True
self.swyh_enabled = False
self.freeze_layers_enabled = False
self.offload_enabled = False
if strategy == OffloadingStrategy.SWYH:
logging.info('Running with offloading strategy: SWYH')
self.deadline_enabled = True
self.swyh_enabled = True
self.freeze_layers_enabled = False
self.offload_enabled = False
if strategy == OffloadingStrategy.FREEZE:
logging.info('Running with offloading strategy: FREEZE')
self.deadline_enabled = True
self.swyh_enabled = False
self.freeze_layers_enabled = True
self.offload_enabled = False
if strategy == OffloadingStrategy.MODEL_OFFLOAD:
logging.info('Running with offloading strategy: MODEL_OFFLOAD')
self.deadline_enabled = True
self.swyh_enabled = False
self.freeze_layers_enabled = True
self.offload_enabled = True
logging.info(f'Offload strategy params: deadline={self.deadline_enabled}, swyh={self.swyh_enabled}, freeze={self.freeze_layers_enabled}, offload={self.offload_enabled}')


def init_device(self):
if self.args.cuda and torch.cuda.is_available():
Expand Down Expand Up @@ -254,19 +294,44 @@ def unfreeze_layers(self):
for param in self.net.parameters():
param.requires_grad = True

def train(self, epoch, deadline_time: int = None):
def train(self, epoch, deadline: int = None):
"""
Different modes:
1. Vanilla
2. Deadline
3. SWYH
4. Just Freeze
5. Model Offload
:: Vanilla
Disable deadline
Disable swyh
Disable offload
:: Deadline
We need to keep track of the incoming deadline
We don't need to send data before the deadline
:param epoch: Current epoch #
:type epoch: int
"""
start_time = time.time()
deadline_threshold = 5
train_stop_time = None
if self.deadline_enabled and deadline is not None:
train_stop_time = start_time + deadline - deadline_threshold

strategy = OffloadingStrategy.VANILLA

# Ignore profiler for now
# p = Profiler()
# p.attach(self.net)

# self.net.train()
global global_model_weights, global_offload_received
deadline_time = None
# deadline_time = None
# save model
if self.args.should_save_model(epoch):
self.save_model(epoch, self.args.get_epoch_save_start_suffix())
Expand All @@ -281,65 +346,58 @@ def train(self, epoch, deadline_time: int = None):
# performance_metric_interval = 20
# perf_resp = None

profiling_size = 40
# Profiling parameters
profiling_size = self.args.profiling_size
profiling_data = np.zeros(profiling_size)
active_profiling = True

control_start_time = time.time()
training_process = 0
for i, (inputs, labels) in enumerate(self.dataset.get_train_loader(), 0):
start_train_time = time.time()

# Check if there is a call to offload
if self.call_to_offload:
self.args.get_logger().info('Got call to offload model')
model_weights = self.get_nn_parameters()
# print(self.client_to_offload_to)
# r_ref = rpc.remote(self.client_to_offload_to, Client.static_ping, args=())
# print(f'Result of rref: {r_ref.to_here()}')
# ret = rpc.rpc_sync(self.client_to_offload_to, Client.static_ping, args=())
# print(f'Result of rref: {ret}')
# ret = rpc.rpc_sync(self.client_to_offload_to, Client.offload_receive_endpoint_2, args=(["Hello"]))
# print(f'Result of rref: {ret}')

ret = rpc.rpc_sync(self.client_to_offload_to, Client.offload_receive_endpoint, args=([model_weights]))
print(f'Result of rref: {ret}')

# r_ref = rpc.remote(self.client_to_offload_to, Client.static_ping, args=())
# r_ref = rpc.remote(self.client_to_offload_to, Client.offload_receive_endpoint_2, args=("Hello world"))
# _remote_method_async(Client.static_ping, self.client_to_offload_to)
# fut1 = rpc.rpc_async(self.client_to_offload_to, Client.ping)
# _remote_method_async_by_info(Client.offload_receive_endpoint, self.client_to_offload_to, model_weights)
self.call_to_offload = False
self.client_to_offload_to = None
# This number only works for cifar10cnn
self.freeze_layers(15)

# Check if there is a model to incorporate
if global_offload_received:
self.args.get_logger().info('Merging offloaded model')
self.args.get_logger().info('FedAvg locally with offloaded model')
updated_weights = average_nn_parameters([self.get_nn_parameters(), global_model_weights])
self.args.get_logger().info('Updating local weights due to offloading')
self.update_nn_parameters(updated_weights)
global_offload_received = False
global_model_weights = None


if deadline_time is not None:
if time.time() >= deadline_time:
self.args.get_logger().info('Stopping training due to deadline time')
break
else:
self.args.get_logger().info(f'Time to deadline: {deadline_time - time.time()}')
if self.offload_enabled:
# Check if there is a call to offload
if self.call_to_offload:
self.args.get_logger().info('Got call to offload model')
model_weights = self.get_nn_parameters()

ret = rpc.rpc_sync(self.client_to_offload_to, Client.offload_receive_endpoint, args=([model_weights]))
print(f'Result of rref: {ret}')

self.call_to_offload = False
self.client_to_offload_to = None
# This number only works for cifar10cnn
# @TODO: Make this dynamic for other networks
self.freeze_layers(15)

# Check if there is a model to incorporate
if global_offload_received:
self.args.get_logger().info('Merging offloaded model')
self.args.get_logger().info('FedAvg locally with offloaded model')
updated_weights = average_nn_parameters([self.get_nn_parameters(), global_model_weights])
self.args.get_logger().info('Updating local weights due to offloading')
self.update_nn_parameters(updated_weights)
global_offload_received = False
global_model_weights = None

if self.deadline_enabled:
# Deadline
if train_stop_time is not None:
if time.time() >= train_stop_time:
self.args.get_logger().info('Stopping training due to deadline time')
break
# else:
# self.args.get_logger().info(f'Time to deadline: {train_stop_time - time.time()}')




inputs, labels = inputs.to(self.device), labels.to(self.device)
training_process = i
# zero the parameter gradients
self.optimizer.zero_grad()

# Ignore profile for now
# p.set_warmup(False)
# p.signal_forward_start()
# forward + backward + optimize
outputs = self.net(inputs)
loss = self.loss_function(outputs, labels)

Expand Down Expand Up @@ -376,15 +434,25 @@ def train(self, epoch, deadline_time: int = None):
est_total_time = number_of_training_samples * time_per_batch
logging.info(f'Estimated training time is {est_total_time}')
self.report_performance_estimate((time_per_batch, est_total_time, number_of_training_samples))
# logging.info(f'Batch time is {batch_duration}')

if self.freeze_layers_enabled:
logging.info(f'Checking if need to freeze layers ? {est_total_time} > {deadline}')
if est_total_time > deadline:
logging.info('Will freeze layers to speed up computation')
# This number only works for cifar10cnn
# @TODO: Make this dynamic for other networks
self.freeze_layers(15)
# logging.info(f'Batch time is {batch_duration}')

if i > 50:
break
# Break away from loop for debug purposes
# if i > 50:
# break

control_end_time = time.time()

logging.info(f'Measure end time is {(control_end_time - control_start_time)}')
logging.info(f'Trained on {training_process} samples')


self.scheduler.step()

Expand All @@ -395,7 +463,7 @@ def train(self, epoch, deadline_time: int = None):
if self.args.should_save_model(epoch):
self.save_model(epoch, self.args.get_epoch_save_end_suffix())

return final_running_loss, self.get_nn_parameters()
return final_running_loss, self.get_nn_parameters(), training_process

def test(self):
self.net.eval()
Expand Down Expand Up @@ -435,14 +503,11 @@ def test(self):
return accuracy, loss, class_precision, class_recall

def run_epochs(self, num_epoch, deadline: int = None):
start_time = time.time()
deadline_threshold = 10
start_time_train = datetime.datetime.now()
train_stop_time = None
if deadline is not None:
train_stop_time = start_time + deadline - deadline_threshold

self.dataset.get_train_sampler().set_epoch_size(num_epoch)
loss, weights = self.train(self.epoch_counter, train_stop_time)
# Train locally
loss, weights, training_process = self.train(self.epoch_counter, deadline)
self.epoch_counter += num_epoch
elapsed_time_train = datetime.datetime.now() - start_time_train
train_time_ms = int(elapsed_time_train.total_seconds()*1000)
Expand All @@ -452,7 +517,7 @@ def run_epochs(self, num_epoch, deadline: int = None):
elapsed_time_test = datetime.datetime.now() - start_time_test
test_time_ms = int(elapsed_time_test.total_seconds()*1000)

data = EpochData(self.epoch_counter, train_time_ms, test_time_ms, loss, accuracy, test_loss, class_precision, class_recall, client_id=self.id)
data = EpochData(self.epoch_counter, num_epoch, train_time_ms, test_time_ms, loss, accuracy, test_loss, class_precision, class_recall, training_process, self.id)
self.epoch_results.append(data)

# Copy GPU tensors to CPU
Expand Down
Loading

0 comments on commit 16a4e91

Please sign in to comment.