10
10
from functools import partial
11
11
from pprint import pformat
12
12
13
- from . import utils
14
-
15
13
import trio
16
14
17
15
from . import report
16
+ from . import utils
18
17
19
18
log = utils .get_logger ()
20
19
@@ -26,8 +25,7 @@ class TimeoutError(Exception):
26
25
27
26
28
27
class SIPpFailure (RuntimeError ):
29
- """SIPp commands failed
30
- """
28
+ """SIPp commands failed"""
31
29
32
30
33
31
class TrioRunner :
@@ -42,13 +40,7 @@ def __init__(
42
40
# store proc results
43
41
self ._procs = OrderedDict ()
44
42
45
- async def run (
46
- self ,
47
- nursery ,
48
- cmds ,
49
- rate = 300 ,
50
- ** kwargs
51
- ):
43
+ async def run (self , nursery , cmds , rate = 300 , ** kwargs ):
52
44
if self .is_alive ():
53
45
raise RuntimeError (
54
46
"Not all processes from a prior run have completed"
@@ -59,16 +51,15 @@ async def run(
59
51
)
60
52
# run agent commands in sequence
61
53
for cmd in cmds :
62
- log .debug (
63
- "launching cmd:\n \" {}\" \n " .format (cmd )
64
- )
54
+ log .debug ('launching cmd:\n "{}"\n ' .format (cmd ))
65
55
66
56
proc = await nursery .start (
67
57
partial (
68
58
trio .run_process ,
69
59
shlex .split (cmd ),
70
60
stdout = subprocess .DEVNULL ,
71
- stderr = subprocess .PIPE
61
+ stderr = subprocess .PIPE ,
62
+ check = False ,
72
63
)
73
64
)
74
65
self ._procs [cmd ] = proc
@@ -125,15 +116,19 @@ async def wait_on_proc(proc):
125
116
# all procs were killed by SIGUSR1
126
117
raise TimeoutError (
127
118
"pids '{}' failed to complete after '{}' seconds" .format (
128
- pformat ([p .pid for p in signalled .values ()]), timeout )
119
+ pformat ([p .pid for p in signalled .values ()]), timeout
120
+ )
129
121
)
130
122
131
123
def iterprocs (self ):
132
124
"""Iterate all processes which are still alive yielding
133
125
(cmd, proc) pairs
134
126
"""
135
- return ((cmd , proc ) for cmd , proc in self ._procs .items ()
136
- if proc and proc .poll () is None )
127
+ return (
128
+ (cmd , proc )
129
+ for cmd , proc in self ._procs .items ()
130
+ if proc and proc .poll () is None
131
+ )
137
132
138
133
def stop (self ):
139
134
"""Stop all agents with SIGUSR1 as per SIPp's signal handling"""
@@ -160,8 +155,7 @@ def is_alive(self):
160
155
return any (self .iterprocs ())
161
156
162
157
def clear (self ):
163
- """Clear all processes from the last run
164
- """
158
+ """Clear all processes from the last run"""
165
159
assert not self .is_alive (), "Not all processes have completed"
166
160
self ._procs .clear ()
167
161
@@ -170,28 +164,25 @@ async def run_all_agents(
170
164
runner ,
171
165
agents ,
172
166
timeout = 180 ,
173
-
174
167
) -> TrioRunner :
175
- """Run a sequencec of agents using a ``TrioRunner``.
176
- """
168
+ """Run a sequencec of agents using a ``TrioRunner``."""
169
+
177
170
async def finalize ():
178
171
# this might raise TimeoutError
179
172
cmds2procs = await runner .get (timeout = timeout )
180
173
agents2procs = list (zip (agents , cmds2procs .values ()))
181
174
msg = report .err_summary (agents2procs )
182
175
if msg :
183
176
# report logs and stderr
184
- await report .emit_logfiles (agents2procs )
177
+ report .emit_logfiles (agents2procs )
185
178
raise SIPpFailure (msg )
186
179
187
180
return cmds2procs
188
181
189
182
try :
190
183
async with trio .open_nursery () as nurse :
191
184
await runner .run (
192
- nurse ,
193
- (ua .render () for ua in agents ),
194
- timeout = timeout
185
+ nurse , (ua .render () for ua in agents ), timeout = timeout
195
186
)
196
187
await finalize ()
197
188
return runner
@@ -201,5 +192,5 @@ async def finalize():
201
192
try :
202
193
await finalize ()
203
194
except SIPpFailure as err :
204
- assert ' exit code -9' in str (err )
195
+ assert " exit code -9" in str (err )
205
196
raise terr
0 commit comments