-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathtests.py
198 lines (159 loc) · 6.36 KB
/
tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from __future__ import print_function, division
import fnmatch
from functools import partial
import os
import re
import sys
from nose.tools import *
import dynsnap
import models
class BaseTest(object):
#m = models.model_func
#ma = {'arg': 'value' }
desc = None # description for under the title
ann_pk = False # Annotate the peaks in the output plot?
ma = { }
def run(self, output, plot=True):
"""Do a run and plot results."""
# sinec self.m is turned into a bound method. I don't want to
# decorate every single subclass with staticmethod... is there
# a better way to handle this?
model = self.m.im_func
events = dynsnap.Events(mode='rw')
events.add_events((t, e, 1) for t,e in model(**self.ma))
# Create keyword arguments for the SnapshotFinder from model args.
varnames = dynsnap.SnapshotFinder.__init__.im_func.func_code.co_varnames
finder_kwargs = { }
for name in varnames:
if name in self.ma:
finder_kwargs[name] = self.ma[name]
finder = dynsnap.SnapshotFinder(events,
weighted=self.ma.get('w'),
args=self.ma,
**finder_kwargs)
if plot:
results = dynsnap.Results(finder,
args=dict(annotate_peaks=self.ann_pk))
# This is the core. Iterate through until we are done wwith
# all intervals.
x = False
while True:
interval = finder.find()
if interval is None: break
t1, t2 = interval
#if not x:
#x = True
#for i, t in enumerate(finder._finder_data['ts']):
# print finder._finder_data['dts'][i], \
# finder._finder_data['measure_data'][i]
if plot:
results.add(finder)
print(t1, t2)
#break
if plot:
# Callback function to decorate the plot a little bit.
def cb(lcls):
title = self.__class__.__name__
if self.ma:
title = "%s (%s)"%(title, " ".join("%s=%s"%(k,v)
for k,v in sorted(self.ma.items())))
if self.desc:
title += '\n'+self.desc
lcls['fig'].suptitle(title)
# plot theoretical value
if hasattr(self, 'theory'):
for i in (0, 1, 2):
if i >= len(lcls['self'].tlows): continue
func = self.theory()
dat = lcls['self'].finding_data[i]
ts, xs = dat
ts = ts
tlow = lcls['self'].tlows[i]
# First round treated differently from future rounds
if i == 0:
# First round
print('first round')
tlow = lcls['self'].tlows[i]
dt_prev = 'first'
else:
dt_prev = lcls['self'].thighs[i-1] - lcls['self'].tlows[i-1]
predicted_xs = [ func(dt_prev, t-tlow) for t in ts ]
lcls['ax2'].plot(ts, predicted_xs, 'o', color='red')
results.plot_1(output, callback=cb)
T = BaseTest
class toy101A(T): m=models.toy101
class toy102A(T): m=models.toy102
class toy103A(T):
m=models.toy103; ma={'seed':13}
class toy103B(T):
m=models.toy101; ma={'seed':18}
desc="has some size-8 intervals"
class toy103N(T):
m=models.toy101; ma={'seed':15}
desc="upper bound too high"
class drift1Am(T):
m=models.drift; ma=dict(seed=13, merge_first=True)
class drift1A(T):
m=models.drift; ma=dict(seed=13, merge_first=False)
class drift1B(T):
m=models.drift; ma=dict(seed=13, c=0.02, merge_first=False)
class drift1C(T):
m=models.drift; ma=dict(seed=13, c=0.00, p=.2, merge_first=False,
t_max=100, N=10000)
def theory(self):
return lambda dtP, dtN: models.J1(dtP, dtN, Pe=partial(models.Pe, self.ma['p']))
#return partial(models.J1_c, self.ma['c'], self.ma['p'])
class drift1D(T):
m=models.drift; ma=dict(seed=13, t_crit=(200, 500))
class drift1E(T):
m=models.drift; ma=dict(seed=None, c=0.01, p=.2, merge_first=False,
t_max=100, N=10000)
def theory(self):
return partial(models.J1_c, c=self.ma['c'], p=self.ma['p'])
class drift1F(T):
from pcd.support.powerlaw import PowerLaw
cpl = PowerLaw(-.5, xmin=.05, xmax=.8)
ppl = PowerLaw(-1, xmin=.5, xmax=.8)
m=models.drift; ma=dict(seed=None, merge_first=False,
t_max=1000, N=1000, c_func=cpl.rv, p_func=ppl.rv)
class drift1G(T):
# Test t_stop at a fixed point.
m=models.drift; ma=dict(seed=13, c=0.02, merge_first=False,
tstop=500)
class drift2A(T):
# Test t_stop at a fixed point.
m=models.drift; ma=dict(seed=13, c=0.02, merge_first=False,
tstop=500, dtmode='event',
)
class periodic1A(T):
m=models.periodic; ma={'N':1000, 'seed':13}; ann_pk=False
desc='periodic'
class periodic1Aw(T):
m=models.periodic; ma={'N':1000, 'seed':13, 'w':True}; ann_pk=False
desc='periodic - weighted'
class periodic1B(T):
m=models.periodic; ma={'N':1000, 'seed':13, 't_crit':(200, 500)}
ann_pk=False
desc='periodic'
all_tests = sorted((x for name, x in globals().items()
if isinstance(x, type)
and issubclass(x, BaseTest)
and x != BaseTest
and not name.startswith('_')),
key=lambda x: x.__name__ )
if __name__ == '__main__':
out_path = 'out-tests/'
to_run = sys.argv[1:]
kwargs = dict(plot=True)
for test in all_tests:
name = test.__name__
# Skip tests we don't want to run, if we specify this thing.
if to_run and not any(re.search(x, name) for x in to_run):
continue
output = out_path+'test-'+name
dirname = os.path.dirname(out_path)
if dirname and not os.path.isdir(dirname):
os.makedirs(dirname)
obj = test()
print(name, obj.m.func_name)
obj.run(output=output, **kwargs)