forked from urihoenig/capnproto
-
Notifications
You must be signed in to change notification settings - Fork 0
/
mega-test.py
executable file
·144 lines (118 loc) · 3.83 KB
/
mega-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#! /usr/bin/env python
# MEGA TEST
#
# usage: mega-test.py <config>
#
# This runs several tests in parallel and shows progress bars for each, based on a config file.
#
# <config> is a file containing a list of commands to run along with the expected number of lines
# they will output (to stdout and stderr combined), which is how the progress bar is calculated.
# The format of the file is simply one test per line, with the line containing the test name,
# the number of output lines expected, and the test command. Example:
#
# mytest 1523 ./my-test --foo bar
# another 862 ./another-test --baz
#
# Each command is interpreted by `sh -euc`, therefore it is acceptable to use environment
# variables and other shell syntax.
#
# After all tests complete, the config file will be rewritten to update the line counts to the
# actual number of lines seen for all passing tests (failing tests are not updated).
import sys
import re
import os
from errno import EAGAIN
from fcntl import fcntl, F_GETFL, F_SETFL
from select import poll, POLLIN, POLLHUP
from subprocess import Popen, PIPE, STDOUT
CONFIG_LINE = re.compile("^([^ ]+) +([0-9]+) +(.*)$")
if len(sys.argv) != 2:
sys.stderr.write("Wrong number of arguments.\n");
sys.exit(1)
if not os.access("/tmp/test-output", os.F_OK):
os.mkdir("/tmp/test-output")
config = open(sys.argv[1], 'r')
tests = []
class Test:
def __init__(self, name, command, lines):
self.name = name
self.command = command
self.lines = lines
self.count = 0
self.done = False
def start(self, poller):
self.proc = Popen(["sh", "-euc", test.command], stdin=dev_null, stdout=PIPE, stderr=STDOUT)
fd = self.proc.stdout.fileno()
flags = fcntl(fd, F_GETFL)
fcntl(fd, F_SETFL, flags | os.O_NONBLOCK)
poller.register(self.proc.stdout, POLLIN)
self.log = open("/tmp/test-output/" + self.name + ".log", "w")
def update(self):
try:
while True:
text = self.proc.stdout.read()
if text == "":
self.proc.wait()
self.done = True
self.log.close()
return True
self.count += text.count("\n")
self.log.write(text)
except IOError as e:
if e.errno == EAGAIN:
return False
raise
def print_bar(self):
percent = self.count * 100 / self.lines
status = "(%3d%%)" % percent
color_on = ""
color_off = ""
if self.done:
if self.proc.returncode == 0:
color_on = "\033[0;32m"
status = "PASS"
else:
color_on = "\033[0;31m"
status = "FAIL: /tmp/test-output/%s.log" % self.name
color_off = "\033[0m"
print "%s%-16s |%-25s| %6d/%6d %s%s " % (
color_on, self.name, '=' * min(percent / 4, 25), self.count, self.lines, status, color_off)
def passed(self):
return self.proc.returncode == 0
for line in config:
if len(line) > 0 and not line.startswith("#"):
match = CONFIG_LINE.match(line)
if not match:
sys.stderr.write("Invalid config syntax: %s\n" % line);
sys.exit(1)
test = Test(match.group(1), match.group(3), int(match.group(2)))
tests.append(test)
config.close()
dev_null = open("/dev/null", "rw")
poller = poll()
fd_map = {}
for test in tests:
test.start(poller)
fd_map[test.proc.stdout.fileno()] = test
active_count = len(tests)
def print_bars():
for test in tests:
test.print_bar()
print_bars()
while active_count > 0:
for (fd, event) in poller.poll():
if fd_map[fd].update():
active_count -= 1
poller.unregister(fd)
sys.stdout.write("\033[%dA\r" % len(tests))
print_bars()
new_config = open(sys.argv[1], "w")
for test in tests:
if test.passed():
new_config.write("%-16s %6d %s\n" % (test.name, test.count, test.command))
else:
new_config.write("%-16s %6d %s\n" % (test.name, test.lines, test.command))
for test in tests:
if not test.passed():
sys.exit(1)
sys.exit(0)