forked from red-hat-storage/cephci
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathparallel.py
155 lines (121 loc) · 4.88 KB
/
parallel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# -*- code: utf-8 -*-
"""
This module provides a context manager for running methods concurrently.
For backward compatability, spawn method is leveraged however one can also
choose to move submit. Likewise, thread pool executor is the default executor.
Timeout is an inherited feature provided by concurrent futures. Additionally,
one wait for all the threads/process to complete even when on thread or process
encounters an exception. This is useful when multiple test modules are
executing different test scenarios.
When a test module controls the threads then it can forcefully terminate all
threads when an exception is encountered.
Changelog:
Version 1.0 used gevent module for parallel method execution.
Version 2.0 uses concurrent.futures module instead of gevent.
You add functions to be run with the spawn method::
with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
You can iterate over the results (which are in arbitrary order)::
with parallel() as p:
for foo in bar:
p.spawn(quux, foo, baz=True)
for result in p:
print result
In version 2, you can choose whether to use threads or processes by
with parallel(thread_pool=False, timeout=10) as p:
_r = [p.spawn(quux, x) for name in names]
If one of the spawned functions throws an exception, it will be thrown
when iterating over the results, or when the with block ends.
When the scope of with block changes, the main thread waits until all
spawned functions have completed within the given timeout. On timeout,
all pending threads/processes are issued shutdown command.
"""
import logging
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from datetime import datetime, timedelta
from time import sleep
logger = logging.getLogger(__name__)
class parallel:
"""This class is a context manager for concurrent method execution."""
def __init__(
self,
thread_pool=True,
timeout=None,
shutdown_cancel_pending=False,
):
"""Object initialization method.
Args:
thread_pool (bool) Whether to use threads or processes.
timeout (int | float) Maximum allowed time.
shutdown_cancel_pending (bool) If enabled, it would cancel pending tasks.
"""
self._executor = ThreadPoolExecutor() if thread_pool else ProcessPoolExecutor()
self._timeout = timeout
self._cancel_pending = shutdown_cancel_pending
self._futures = list()
self._results = list()
self._iter_index = 0
@property
def count(self):
return len(self._futures)
@property
def results(self):
return self._results
def spawn(self, fun, *args, **kwargs):
"""Triggers the first class method.
Args:
func: Function to be executed.
args: A list of variables to be passed to the function.
kwargs A dictionary of named variables.
Returns:
None
"""
_future = self._executor.submit(fun, *args, **kwargs)
self._futures.append(_future)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, trackback):
_not_done = self._futures[:]
_end_time = datetime.now() + timedelta(
seconds=self._timeout if self._timeout else 3600
)
# Wait for all futures to complete within the given time or 1 hour.
while datetime.now() < _end_time:
# if the list is empty break
if len(_not_done) == 0:
break
sleep(2.0)
for _f in _not_done:
if _f.done():
_not_done.remove(_f)
# Graceful shutdown of running threads
if _not_done:
self._executor.shutdown(wait=False, cancel_futures=self._cancel_pending)
if exc_value is not None:
logger.exception(trackback)
return False
# Check for any exceptions and raise
# At this point, all threads/processes should have completed or cancelled
try:
for _f in self._futures:
self._results.append(_f.result())
except Exception:
logger.exception("Encountered an exception during parallel execution.")
raise
return True
def __iter__(self):
return self
def __next__(self):
if self.count == 0 or self._iter_index == self.count:
self._iter_index = 0 # reset the counter
raise StopIteration()
try:
# Keeping timeout consistent when called within the context
_timeout = self._timeout if self._timeout else 3600
out = self._futures[self._iter_index].result(timeout=_timeout)
except Exception as e:
logger.exception(e)
out = e
self._iter_index += 1
return out