-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathunittest_arrays.py
119 lines (105 loc) · 5.97 KB
/
unittest_arrays.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import unittest
import numpy as np
import mlframe.arrays as m
import mlframe.tests as tests
minElem=50;maxElem=1000;arrSize=1000000
def baselineArgSort(vals):
return np.argsort(vals)
def baselineArgSortIndexed(vals,indices):
fr=vals[indices]
return indices[np.argsort(fr)]
class Test(unittest.TestCase):
def test_arrayMinMax(self):
#Validity
self.assertEqual(m.arrayMinMax(np.random.randint(minElem,maxElem,arrSize)),(minElem, maxElem-1))
self.assertEqual(m.arrayMinMax(np.arange(20),10,15), (10,14))
def test_arrayMinMaxParallel(self):
#Validity
self.assertEqual(m.arrayMinMaxParallel(np.random.randint(minElem,maxElem,arrSize)), (minElem, maxElem-1))
self.assertEqual(m.arrayMinMaxParallel(np.arange(20),10,15), (10,14))
#Performance
x = np.random.rand(arrSize)
def baselineMinMax(x):
return x.min(), x.max()
baselineTime=tests.repeatManyTimes(baselineMinMax,x)
print("Baseline x.min(), x.max() test took %s" % baselineTime)
baselineTime=tests.repeatManyTimes(m.arrayMinMax,x)
print("Baseline arrayMinMax test took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayMinMaxParallel,x,maxThreads=2)
print("Optimized arrayMinMaxParallel test took %s" % optimizedTime)
#self.assertTrue(optimizedTime<baselineTime)
def test_arrayCountingSort(self):
x=np.random.randint(minElem,maxElem,arrSize)
#Validity
self.assertTrue((m.arrayCountingSort(x,maxElem)==np.sort(x)).all())
#Performance
baselineTime=tests.repeatManyTimes(np.sort,x)
print("Baseline np.sort test took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayCountingSort,x,maxElem)
print("Optimized arrayCountingSort test took %s" % optimizedTime)
self.assertTrue(optimizedTime<baselineTime)
def test_arrayCountingArgSort(self):
x=np.random.randint(minElem,maxElem,arrSize)
#Validity
# whole array
self.assertTrue((x[m.arrayCountingArgSort(x,maxElem)]==x[np.argsort(x)]).all())
# indexed array
indices=np.random.choice(x,arrSize//5,replace=False)
self.assertTrue((x[m.arrayCountingArgSort(x,maxElem,indices)]==x[indices[np.argsort(x[indices])]]).all())
#Performance
# whole array
baselineTime=tests.repeatManyTimes(baselineArgSort,x)
print("Baseline np.argsort test for whole array took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayCountingArgSort,x,maxElem)
print("Optimized arrayCountingArgSort test for whole array took %s" % optimizedTime)
self.assertTrue(optimizedTime<baselineTime)
# indexed array
baselineTime=tests.repeatManyTimes(baselineArgSortIndexed,x,indices)
print("Baseline np.argsort test for indexed array took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayCountingArgSort,x,maxElem,indices)
print("Optimized arrayCountingArgSort test for indexed array took %s" % optimizedTime)
self.assertTrue(optimizedTime<baselineTime)
def test_arrayCountingArgSortParallel(self):
x=np.random.randint(minElem,maxElem,arrSize)
#Validity
# whole array
self.assertTrue((x[m.arrayCountingArgSortThreaded(x,maxElem)]==x[np.argsort(x)]).all())
# indexed array
indices=np.random.choice(x,arrSize//5,replace=False)
self.assertTrue((x[m.arrayCountingArgSortThreaded(x,maxElem,indices)]==x[indices[np.argsort(x[indices])]]).all())
#Performance
# whole array
baselineTime=tests.repeatManyTimes(baselineArgSort,x)
print("Baseline np.argsort test for whole array took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayCountingArgSortThreaded,x,maxElem)
print("Optimized arrayCountingArgSortThreaded test for whole array took %s" % optimizedTime)
self.assertTrue(optimizedTime<baselineTime)
# indexed array
baselineTime=tests.repeatManyTimes(baselineArgSortIndexed,x,indices)
print("Baseline np.argsort test for indexed array took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayCountingArgSortThreaded,x,maxElem,indices)
print("Optimized arrayCountingArgSortThreaded test for indexed array took %s" % optimizedTime)
self.assertTrue(optimizedTime<baselineTime)
def test_arrayCountingArgSortAndUniqueValues(self):
x=np.random.randint(minElem,maxElem,arrSize)
#Validity
# whole array
self.assertTrue((x[m.arrayCountingArgSortAndUniqueValues(x,maxElem)[2]]==x[np.argsort(x)]).all())
# indexed array
indices=np.random.choice(x,arrSize//5,replace=False)
self.assertTrue((x[m.arrayCountingArgSortAndUniqueValues(x,maxElem,indices)[2]]==x[indices[np.argsort(x[indices])]]).all())
#Performance
# whole array
baselineTime=tests.repeatManyTimes(baselineArgSort,x)
print("Baseline np.argsort test for whole array took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayCountingArgSortAndUniqueValues,x,maxElem)
print("Optimized arrayCountingArgSortAndUniqueValues test for whole array took %s" % optimizedTime)
self.assertTrue(optimizedTime<baselineTime)
# indexed array
baselineTime=tests.repeatManyTimes(baselineArgSortIndexed,x,indices)
print("Baseline np.argsort test for indexed array took %s" % baselineTime)
optimizedTime=tests.repeatManyTimes(m.arrayCountingArgSortAndUniqueValues,x,maxElem,indices)
print("Optimized arrayCountingArgSortAndUniqueValues test for indexed array took %s" % optimizedTime)
self.assertTrue(optimizedTime<baselineTime)
if __name__ == '__main__':
unittest.main()