-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paththreadpool.py
117 lines (101 loc) · 3.88 KB
/
threadpool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# ThreadPool is a simple thread pool
#
# Copyright (C) 2012 Yummy Bian <yummy.bian#gmail.com>
#
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ThreadPool is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
# See the GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.";
#
import sys
import Queue
import threading
class Worker(threading.Thread):
"""Routines for work thread."""
def __init__(self, in_queue, out_queue, err_queue):
"""Initialize and launch a work thread,
in_queue which tasks in it waiting for processing,
out_queue which the return value of the task in it,
err_queue which stores error info when processing the task.
"""
threading.Thread.__init__(self)
self.setDaemon(True)
self.in_queue = in_queue
self.out_queue = out_queue
self.err_queue = err_queue
self.start()
def run(self):
while True:
# Processing tasks in the in_queue until command is stop.
command, callback, args, kwds = self.in_queue.get()
if command == 'stop':
break
try:
if command != 'process':
raise ValueError('Unknown command %r' % command)
except:
self.report_error()
else:
self.out_queue.put(callback(*args, **kwds))
def dismiss(self):
command = 'stop'
self.in_queue.put((command, None, None, None))
def report_error(self):
'''We "report" errors by adding error information to err_queue.'''
self.err_queue.put(sys.exc_info()[:2])
class ThreadPool():
"""Manager thread pool."""
max_threads = 32
def __init__(self, num_threads, pool_size=0):
"""Spawn num_threads threads in the thread pool,
and initialize three queues.
"""
# pool_size = 0 indicates buffer is unlimited.
num_threads = ThreadPool.max_threads \
if num_threads > ThreadPool.max_threads \
else num_threads
self.in_queue = Queue.Queue(pool_size)
self.out_queue = Queue.Queue(pool_size)
self.err_queue = Queue.Queue(pool_size)
self.workers = {}
for i in range(num_threads):
worker = Worker(self.in_queue, self.out_queue, self.err_queue)
self.workers[i] = worker
def add_task(self, callback, *args, **kwds):
command = 'process'
self.in_queue.put((command, callback, args, kwds))
def _get_results(self, queue):
'''Generator to yield one after the others all items currently
in the queue, without any waiting
'''
try:
while True:
yield queue.get_nowait()
except Queue.Empty:
raise StopIteration
def get_task(self):
return self.out_queue.get()
def show_results(self):
for result in self._get_results(self.out_queue):
print 'Result:', result
def show_errors(self):
for etyp, err in self._get_results(self.err_queue):
print 'Error:', etyp, err
def get_progress(self):
return self.out_queue.qsize() + self.err_queue.qsize()
def destroy(self):
# order is important: first, request all threads to stop...:
for i in self.workers:
self.workers[i].dismiss()
# ...then, wait for each of them to terminate:
for i in self.workers:
self.workers[i].join()
# clean up the workers from now-unused thread objects
del self.workers