Skip to content

Commit

Permalink
Make experiments reproducbile for batch-generation
Browse files Browse the repository at this point in the history
  • Loading branch information
JMGaljaard committed Sep 16, 2022
1 parent ae71baa commit 38683a0
Showing 1 changed file with 36 additions and 18 deletions.
54 changes: 36 additions & 18 deletions fltk/util/task/generator/arrival_generator.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import collections
import logging
import multiprocessing
import random
import time
from abc import abstractmethod
from dataclasses import dataclass
from datetime import timedelta
from pathlib import Path
from queue import Queue
from random import choices
from typing import Dict, List, Union, OrderedDict, Optional
from typing import Dict, List, Union, OrderedDict, Optional, Any

import numpy as np

Expand Down Expand Up @@ -45,17 +46,21 @@ def load_config(self):
self.job_dict = collections.OrderedDict(
{f'train_job_{indx}': item for indx, item in enumerate(experiment_descriptions.train_tasks)})

def start(self, duration: Union[float, int]):
def start(self, args: List[Any], kwds: Dict[str, Any]):
"""
Function to start arrival generator, requires to
@param args: List of arguments to pass to the arrival generator at generation time.
@type args: List[Any]
@param kwds: Dictionary of keyword arguments to pass to the arrival generator at generation time.
@type: kwds: Dict[str, Any]
@return: None
@rtype: None
"""
if not self.logger:
self.set_logger()
self.logger.info("Starting execution of arrival generator...")
self.logger.info("Starting execution of arrival generator.")
self.alive = True
self.run(duration)
self.run(*args, **kwds)

def stop(self) -> None:
"""
Expand All @@ -67,11 +72,13 @@ def stop(self) -> None:
self.alive = False

@abstractmethod
def run(self, duration: float):
def run(self, duration: float, seed: Optional[int] = None) -> None:
"""
Abstract function to run experiment generator for a specified time duration.
@param duration: Time in seconds to run experiment generation.
@type duration: int
@param seed: Optional seed to be used depending on implementation.
@type seed: Optional[int]
@return: None
@rtype: None
"""
Expand Down Expand Up @@ -173,13 +180,21 @@ def generate_arrival(self, task_id: str, inter_arrival_unit: timedelta = timedel

return Arrival(inter_arrival_ticks, train_task, task_id)

def run(self, duration: float):
def run(self, duration: float, seed: Optional[int] = None) -> None:
"""
Run function to generate arrivals during existence of the Orchestrator. Accounts time-drift correction for
long-term execution duration of the generator (i.e. for time taken by Python interpreter).
Simulated arrivals utilize a random seed as starting point to provide seed,
@param duration: How long (in seconds) the experiment will run, tasks will not be killed if they run longer
than the experiment, but will be waited on.
@type duration float
@param seed: Integer seed, WILL BE IGNORED. See also SequentialArrivalGenerator.
@type seed: Optional[int]
@return: None
@rtype: None
"""
if seed:
self.logger.warning(f"Was provided seed: {seed}, is not supported for Simulated Arrivals.")
self.start_time = time.time()
self.logger.info("Populating tick lists with initial arrivals")
for task_id in self.job_dict.keys():
Expand Down Expand Up @@ -232,29 +247,32 @@ def set_logger(self, name: str = None):
logging_name = name or self.__class__.__name__
self.logger = logging.getLogger(logging_name)

def run(self, duration: float):
def run(self, duration: float, seed: Optional[int] = None) -> None:
"""
Helper method to start experiments. Curent implementations only runs without duration. I.e. this method
runs in a fire-and-forget fashion without obeying to the duration parameter that may have been set.
@param duration:
@type duration:
@return:
@rtype:
@param duration: Time in seconds to run the generator. IGNORED
@type duration: float
@param seed: Seed to be passed to a repeatable experiment. E.g. Federated Learning experiments.
@type seed: Optional[int]
@return: None
@rtype: None
"""
self.start_time = time.time()

description: JobDescription
if not seed:
replace_seed = random.randint(0, (2 ** 32) - 2)
logging.warning(f"Cannot generate repeatable experiments without seed, will set: {replace_seed}")
seed = replace_seed
for job_name, description in self.job_dict.items():
# TODO: Ensure seeds are set properly
raise NotImplementedError("Run is to be re-implemented for BatchedArrivals in an upcomming release")
for repl, seed in enumerate(description.job_class_parameters.experiment_configuration.random_seed):
for repl, job_class_param in enumerate(description.job_class_parameters):
replication_name = f"{job_name}_{repl}_{seed}"
train_task = TrainTask(identity=replication_name,
job_parameters=description.job_class_parameters,
job_parameters=job_class_param,
priority=description.priority,
# experiment_config=description.get_experiment_configuration(),
replication=repl,
experiment_type=description.experiment_type)
experiment_type=description.experiment_type,
seed=seed)

arrival = Arrival(None, train_task, job_name)
self.arrivals.put(arrival)

0 comments on commit 38683a0

Please sign in to comment.