-
Notifications
You must be signed in to change notification settings - Fork 0
/
pyThreadsEx.py
312 lines (276 loc) · 10.8 KB
/
pyThreadsEx.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
from logging import debug
from queue import Queue, Empty, Full
from threading import Thread, Lock, Condition
from time import sleep
def serialize(__mutex):
'''
Custom decorator which allows serialised access to a function
using a mutex.
'''
assert(__mutex != None)
def __serialize(func):
def wrapped(*args, **kwargs):
with __mutex:
result = func(*args, **kwargs)
return result
return wrapped
return __serialize
class thread_pool_stopped_exception(Exception):
'''
This exception class is thrown when any attempt to
use a stopped thread-pool is attempted.
'''
"Cannot add new threads to a stopped thread-pool."
class thread_pool_full_exception(Exception):
'''
This exception class is thrown when any attempt to
add a new task to a thread-pool which is full happens.
'''
"Cannot add new task to thread-pool as it is full."
class thread_pool(object):
'''
Thread-pool implementation which has the following features:
- Dynamic scaling of threads-to-work load
- Implementation of thread-pool tasks which are akin to
futures.
- Fully join-able
'''
############################################################################
class thread_pool_task(object):
'''
A thread-pool task is a single unit of work to be undertaken
by a thread-pool. When a task is queued on a thread-pool a
handle to a thread-pool task is returned which acts to the caller
as a future. The result of the task performed maybe accessed
by the blocking get() call
'''
def __init__(self, tid, func, completion_handler, *args, **kwargs):
self.__tid = tid
self.__func = func
self.__on_complete = completion_handler
self.__args = args
self.__kwargs = kwargs
self.__result = None
self.__exception = None
self.__completed = False
self.__completion_condition = Condition()
def run(self):
debug("Starting execution of task %d" % self.__tid)
try:
self.__result = self.__func(*self.__args, **self.__kwargs)
except Exception as e:
self.__exception = e
debug("Exception %s thrown when executing task %d." % (e, self.__tid))
self.__completion_condition.acquire()
self.__completed = True
self.__on_complete()
self.__completion_condition.notify_all()
self.__completion_condition.release()
debug("Completed task %d" % self.__tid)
def get(self):
# Acquire the internal lock.
# The ability to acquire the internal lock is
# only possible once the task has run.
self.__completion_condition.acquire()
if(self.__completed is False):
self.__completion_condition.wait()
self.__completion_condition.release()
if(self.__exception != None):
raise self.__exception;
return self.__result
############################################################################
class thread_pool_thread(Thread):
'''
This class represents a single thread-pool thread owned by a thread-pool
'''
def __init__(self, idx, pool):
# super(self)
Thread.__init__(self, group = None, name = str(idx),
args = (), kwargs = {})
self.__pool = pool
self.__complete = False
def sig_is_complete(self):
self.__complete = True
def run(self):
# process whilst our thread-pool is stil alive
while(self.__complete != True):
task = self.__pool.pop()
if(task != None):
task.run()
############################################################################
def __init__(self, count, max_tasks=0):
self.__complete = False
self.__mutex = Lock()
self.__next_tid = 1
self.__threads = []
self.__tasks = Queue(maxsize=max_tasks)
self.__monitor_thread = Thread(target = self.__monitor__,
name = "ThreadPool Monitor")
self.__min_thread_count = count
self.__most_thread_count = self.__min_thread_count
self.__total_tasks = 0
assert(self.__min_thread_count != 0)
for i in range(self.__min_thread_count):
thread = thread_pool.thread_pool_thread(i, self);
self.__threads.append(thread)
thread.start()
self.__monitor_thread.start()
def stats(self):
'''
Return a dictionary of statistics about the thread-pool
'''
result = {"MinThreads" : self.__min_thread_count,
"MaxThreads" : self.__most_thread_count,
"TaskCount" : self.__tasks.qsize()}
return result
def __monitor__(self):
while(self.is_complete() != True):
with self.__mutex:
# Snap shot queue and threads.
task_count = self.__tasks.qsize()
thread_count = len(self.__threads)
# If we have more tasks queued than threads
# then generate a new thread to handle demand.
if(task_count > thread_count):
debug("Adding new thread in monitor: tasks = %d, threads = %d"
% (task_count, thread_count))
try:
self.__add_threads(1)
except thread_pool_stopped_exception:
# Processing complete
pass
# If we have no tasks and more than the minimum
# threads then slowly contract the thread-pool
elif((task_count == 0) and (thread_count > self.__min_thread_count)):
debug(
"Removing existing thread in monitor: tasks = %d, threads = %d"
% (task_count, thread_count))
try:
self.__remove_threads(1)
except thread_pool_stopped_exception:
# processing complete
pass
# monitor once a second
sleep(1.0)
def thread_count(self):
with self.__mutex:
result = len(self.__threads)
return result
def max_tasks(self):
return self.__tasks.maxsize
def task_count(self):
return self.__tasks.qsize()
def add_threads(self, count):
with self.__mutex:
self.__add_threads(count)
def __add_threads(self, count):
if(self.is_complete()):
raise thread_pool.thread_pool_stopped_exception()
thread = thread_pool.thread_pool_thread(self.__threads.count, self)
self.__threads.append(thread)
thread.start()
thread_count = len(self.__threads)
if(thread_count > self.__most_thread_count):
self.__most_thread_count = thread_count
def remove_threads(self, count):
with self.__mutex:
return self.__remove_threads(count)
def __remove_threads(self, count):
if(self.is_complete()):
raise thread_pool_stopped_exception()
current = len(self.__threads)
result = count
if(self.__min_thread_count < (current - count)):
result = current - self.__min_thread_count
for i in range(result):
thread_to_dispose = self.__threads.pop()
thread_to_dispose.sig_is_complete()
thread_to_dispose.join()
return result
def process(self, func, *args, **kwargs):
tid = 0
with self.__mutex:
if(self.is_complete()):
raise thread_pool.thread_pool_stopped_exception()
tid = self.__next_tid
self.__next_tid = self.__next_tid + 1
debug("Adding new thread-pool task. %d already queued." % self.task_count())
try:
task = thread_pool.thread_pool_task(tid, func, self.__tasks.task_done, *args, **kwargs)
if(self.__tasks.maxsize == 0):
block = True
else:
block = False;
debug("Queueing task with blocking as %d" % block)
self.__tasks.put(task, block = block)
except Full:
debug("Failed to add task %d as task queue full" % tid)
raise thread_pool_full_exception()
return task
def pop(self):
result = None
try:
result = self.__tasks.get(timeout=0.01)
except Empty: # empty queue, ignore
pass
return result
def is_complete(self):
return self.__complete
def join(self):
with self.__mutex:
self.__complete = True
# Drain the queue of items to be processed
self.__tasks.join()
self.__monitor_thread.join()
# interupt each thread
for t in self.__threads:
t.sig_is_complete() # signal completion
t.join() # join
def __enter__(self):
return self
def __exit__(self, *args):
self.join()
class no_target_exception(Exception):
'''
Custom exception thrown when no target is set
upon a future.
'''
def __init__(self, name=None):
self.message = "No target for specified future"
if(name != None):
self.message = self.message + " (" + name + ")"
class future(Thread):
'''
A future allows the evaluation of a target asynchronously.
The result of the asynchronous calculation can be accessed
using the get() method. If an exception is thrown during
the evaluation of the target, the exception will be
re-thrown from the get() method.
'''
def __init__(self, target=None, group=None, name=None,
args=(), kwargs={}):
if(target == None):
raise no_target_exception(name)
Thread.__init__(self, group, target, name,
args, kwargs)
self.__target = target
self.__args = args
self.__kwargs = kwargs
self.__retval = None # return value
self.__exception = None # internal exception
self.start()
def run(self):
try:
self.__retval = self.__target(*self.__args, **self.__kwargs)
except Exception as e:
self.__exception = e
# return the result of the asynchronous calculation
def get(self, timeout=None):
self.join(timeout)
if(self.__exception != None):
raise self.__exception
return self.__retval;
def __enter__(self):
return self
def __exit__(*args, **kwargs):
self.get()