-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathexperiment.py
140 lines (114 loc) · 4.5 KB
/
experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import time
import math
import datetime
import random
import logging
import monitoring
import job_operations
from parameters import backend_experiment_db, JOB_QUEUE_PREFIX
from celery import subtask
logger = logging.getLogger(__name__)
class Experiment:
""" Experiment Class
Holds an experiment
"""
def __init__(self, experiment_id, experiment):
""" init method """
# Reset stating time
self.jqueuer_task_added_count = 0
self.jqueuer_job_added_count = 0
# Assigning experiment ID
self.experiment_id = experiment_id
# Assigning the desired software from the experiment to a variable
try:
self.service_name = experiment["container_name"]
except KeyError:
logger.error("-JQueuer- No container_name specified in JSON!")
raise KeyError
self.add_service(self.service_name)
self.experiment = experiment
experiment_adding_timestamp = self.time_now()
monitoring.experiment_adding_timestamp(
self.experiment_id, self.service_name, experiment_adding_timestamp
)
deadline = experiment_adding_timestamp + self.experiment.get('experiment_deadline', 0)
monitoring.experiment_deadline(
self.experiment_id, self.service_name, deadline
)
duration = self.experiment.get('single_task_duration', 300)
monitoring.experiment_task_duration(
self.experiment_id, self.service_name, duration
)
def time_now(self):
return datetime.datetime.now().timestamp()
def add_service(self, service_name):
""" Add the service name to the backend (redis) database """
if backend_experiment_db.exists(service_name):
logger.error("-JQueuer- This service already has an experiment, delete it first")
backend_experiment_db.hmset(service_name, {"experiment_id": self.experiment_id})
def process_jobs(self):
""" decide whether the jobs are stored in a list or an array """
if isinstance(self.experiment["jobs"], list):
self.process_job_list()
else:
self.process_job_array()
self.task_per_job_avg = math.ceil(
self.jqueuer_task_added_count / self.jqueuer_job_added_count
)
def process_job_list(self):
""" process all jobs in the list """
for job in self.experiment["jobs"]:
try:
job_params = job["params"]
except Exception as e:
job["params"] = self.experiment["params"]
try:
job_command = job["command"]
except Exception as e:
job["command"] = self.experiment["command"]
self.add_job(job)
def process_job_array(self):
""" process job array """
jobs = self.experiment["jobs"]
try:
job_params = jobs["params"]
except Exception as e:
jobs["params"] = self.experiment["params"]
try:
job_command = jobs["command"]
except Exception as e:
jobs["command"] = self.experiment["command"]
for x in range(0, jobs["count"]):
job_id = jobs["id"] + "_" + str(x)
self.add_job(jobs, job_id)
def add_job(self, job, job_id=None):
""" Add a job (and its tasks) to the queue and update the monitoring counters """
if not job_id:
job_id = job["id"]
self.add_tasks(job["tasks"], job_id)
self.jqueuer_job_added_count += 1
monitoring.add_job(self.experiment_id, self.service_name, job_id)
job_queue_id = (
"j_"
+ self.service_name
+ "_"
+ str(int(round(time.time() * 1000)))
+ "_"
+ str(random.randrange(100, 999))
)
chain = subtask(
"job_operations.add", queue=JOB_QUEUE_PREFIX + self.service_name
)
chain.delay(self.experiment_id, job_queue_id, job)
def add_tasks(self, tasks, job_id):
""" Count the tasks in a job and update the counters """
for task in tasks:
self.jqueuer_task_added_count += 1
monitoring.add_task(
self.experiment_id, self.service_name, job_id, task["id"]
)
def start(self):
""" Start the experiment """
self.process_jobs()
logger.info("-JQueuer- Added experiment ID {}".format(self.experiment_id))
logger.info("-JQueuer- Will try to run on container: {}".format(self.service_name))