-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask_scheduler.py
executable file
·164 lines (146 loc) · 6.19 KB
/
task_scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#!usr/bin/env python
import os
import sys
import workload
from xml_parser import *
from MySql_wrapper import Mysql_wrapper
from cmd_generator import *
from workload import *
import pandas
import cmd_generator
import time
import threading
from docker_control import Docker_Monitor
from threading import Thread
from gpu_control import *
from enum import Enum
class Task_Scheduler(object):
TASK_STATE = Enum("Pendding", "Running", "Finish")
def __init__(self):
self.sql_wrapper = Mysql_wrapper('DPMSystem.sh.intel.com', 'root', 'tracing', 'automations_test')
self.docker_control = Docker_Monitor()
self.gpu_monitor = GPUMonitor()
self.gpu_monitor.init_local_gpu_lists()
self.gpu_monitor.register_listener(self)
self.requests = {}
self.pending_requests = []
self.prepare_env()
self.lock = threading.Lock()
def prepare_env(self):
self.sql_wrapper.init_database()
self.docker_control.get_local_images(self.sql_wrapper)
def build_image(self):
pass
def create_account(self, user, password, mail):
result = False
if self.sql_wrapper.exists_mail(mail):
result = False
else:
self.sql_wrapper.create_account(user, password, mail)
result = True
return result
def exists_account(self, mail, password):
return self.sql_wrapper.account_login(mail, password)
def parse_new_request_from_xml(self, filepath):
xml_parser = XMLParser(filepath)
request = xml_parser.parse_xml()
gpu_device = self.gpu_monitor.get_gpu_from_model(request['gpu_model'])
if gpu_device is None:
farmer_log.error('Internal Fatal Error, Wrong GPU Model Name')
raise Exception('Internal Fatal Error')
# Add some keys request dicts
request['gpu_id'] = gpu_device.gpuid
request['gpu_device'] = gpu_device
request['history_temperature'] = []
request['history_freq'] = []
request['history_power'] = []
request['raw_buffer'] = bytearray()
return request
def assign_request(self, filepath):
"""
scheduler tries to assign one request to one specified GPU
"""
# This lock is to protect assign_request. There're multiple process/threads may execute assign requests.
# So I use one lock here for safety.
self.lock.acquire()
request = self.parse_new_request_from_xml(filepath)
request["state"] = self.__class__.TASK_STATE.Pendding
self.requests[request['request_id']] = request
self.pending_requests.append(request)
self.lock.release()
t = threading.Thread(target=self.schedule)
t.start()
def schedule(self):
"""
Only one thread can be run here
"""
self.lock.acquire()
for request in self.pending_requests:
if self.request_runnable(request):
request['gpu_device'].blocked = True
request["state"] = self.__class__.TASK_STATE.Running
self.test_start(request)
request['gpu_device'].blocked = False
self.pending_requests.remove(request)
self.lock.release()
def request_runnable(self, request):
return False if request['gpu_device'].blocked else True
def workload_run(self, container, request):
gpuid = request['gpu_id']
request_id = request['request_id']
gpuid = 1
test_workload = None
print(request['framework'])
if request['framework'] == 'caffe':
test_workload = Caffe_Workload(container, request['request_id'], request['profiling'])
else:
test_workload = Tensorflow_Workload(container, request['request_id'], request['profiling'])
test_workload.copy()
results = test_workload.run_batch(request['topology'], request['iterations'], request['batch_size'], gpuid, request['raw_buffer'], request['source'])
return results;
def test_start(self, request):
index = self.docker_control.get_image_index(request['cuda_string'], request['cudnn_string'], request['caffe'], request['tensorflow'])
if index == -1:
# TODO
self.build_image()
# TODO
# docker control inert docker_image_info into database
index = self.docker_control.get_image_index(request['cuda_string'], request['cudnn_string'], request['caffe'], request['tensorflow'])
request_id = request['request_id']
image = self.docker_control.get_image(index)
container = get_random_container()
execute(run_docker(container, image.repository, image.tag))
results = self.workload_run(container, request)
self.sql_wrapper.inert_item_in_request_reports(request_id, container,
request["gpu_model"], request["email"], request["framework"],
request["topology"], request["batch_size"], request["iterations"])
for result in results:
self.sql_wrapper.inert_item_in_result_reports(\
request_id, \
container, \
request["gpu_model"], \
result['framework'], \
result['topology'],\
result['batch_size'],\
result['source'],\
result['iterations'],\
result['score'],\
result['training_images_per_second']
)
#execute(stop_docker(container))
request["state"] = self.__class__.TASK_STATE.Finish
return True
def response_gpu_state_request(self, request_id):
request = self.requests[request_id]
gpu_device = request['gpu_device']
return gpu_device.response_status_as_json(request)
def make_download_file(self, request_id):
dataMediator = self.sql_wrapper.get_result_by_request_id(request_id)
dataFrame = pandas.DataFrame(dataMediator.to_data_frame())
filename = request_id + ".xlsx"
dataFrame.to_excel("./xlsx/" + filename, index = False)
return filename
if __name__ == "__main__":
scheduler = Task_Scheduler()
scheduler.assign_request(sys.argv[1])
print("print buffer")