forked from kif/imageAlignment
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcythreading.pyx
258 lines (219 loc) · 8.51 KB
/
cythreading.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
#Cython implementaation of fast locking
# author: Stephan Behnel additions from Jerome Kieffer
# http://code.activestate.com/recipes/577336/
# C++ version
from cpython cimport pythread
from cpython.exc cimport PyErr_NoMemory
from libcpp.list cimport list
from time import time as _time
from time import sleep as _sleep
cdef class FastRLock:
"""Fast, re-entrant locking.
Under uncongested conditions, the lock is never acquired but only
counted. Only when a second thread comes in and notices that the
lock is needed, it acquires the lock and notifies the first thread
to release it when it's done. This is all made possible by the
wonderful GIL.
"""
cdef pythread.PyThread_type_lock _real_lock
cdef long _owner # ID of thread owning the lock
cdef int _count # re-entry count
cdef int _pending_requests # number of pending requests for real lock
cdef bint _is_locked # whether the real lock is acquired
def __cinit__(self):
self._owner = -1
self._count = 0
self._is_locked = False
self._pending_requests = 0
self._real_lock = pythread.PyThread_allocate_lock()
if self._real_lock is NULL:
PyErr_NoMemory()
def __dealloc__(self):
if self._real_lock is not NULL:
pythread.PyThread_free_lock(self._real_lock)
self._real_lock = NULL
def acquire(self, bint blocking=True):
return lock_lock(self, pythread.PyThread_get_thread_ident(), blocking)
def release(self):
if self._owner != pythread.PyThread_get_thread_ident():
raise RuntimeError("cannot release un-acquired lock")
unlock_lock(self)
# compatibility with threading.RLock
def __enter__(self):
# self.acquire()
return lock_lock(self, pythread.PyThread_get_thread_ident(), True)
def __exit__(self, t, v, tb):
# self.release()
if self._owner != pythread.PyThread_get_thread_ident():
raise RuntimeError("cannot release un-acquired lock")
unlock_lock(self)
def _is_owned(self):
return self._owner == pythread.PyThread_get_thread_ident()
cdef inline bint lock_lock(FastRLock lock, long current_thread, bint blocking) nogil:
# Note that this function *must* hold the GIL when being called.
# We just use 'nogil' in the signature to make sure that no Python
# code execution slips in that might free the GIL
if lock._count:
# locked! - by myself?
if current_thread == lock._owner:
lock._count += 1
return 1
elif not lock._pending_requests:
# not locked, not requested - go!
lock._owner = current_thread
lock._count = 1
return 1
# need to get the real lock
return _acquire_lock(
lock, current_thread,
pythread.WAIT_LOCK if blocking else pythread.NOWAIT_LOCK)
cdef bint _acquire_lock(FastRLock lock, long current_thread, int wait) nogil:
# Note that this function *must* hold the GIL when being called.
# We just use 'nogil' in the signature to make sure that no Python
# code execution slips in that might free the GIL
if not lock._is_locked and not lock._pending_requests:
# someone owns it but didn't acquire the real lock - do that
# now and tell the owner to release it when done. Note that we
# do not release the GIL here as we must absolutely be the one
# who acquires the lock now.
if not pythread.PyThread_acquire_lock(lock._real_lock, wait):
return 0
#assert not lock._is_locked
lock._is_locked = True
lock._pending_requests += 1
with nogil:
# wait for the lock owning thread to release it
locked = pythread.PyThread_acquire_lock(lock._real_lock, wait)
lock._pending_requests -= 1
#assert not lock._is_locked
#assert lock._count == 0
if not locked:
return 0
lock._is_locked = True
lock._owner = current_thread
lock._count = 1
return 1
cdef inline void unlock_lock(FastRLock lock) nogil:
# Note that this function *must* hold the GIL when being called.
# We just use 'nogil' in the signature to make sure that no Python
# code execution slips in that might free the GIL
#assert lock._owner == pythread.PyThread_get_thread_ident()
#assert lock._count > 0
lock._count -= 1
if lock._count == 0:
lock._owner = -1
if lock._is_locked:
pythread.PyThread_release_lock(lock._real_lock)
lock._is_locked = False
cdef class Condition:
cdef FastRLock _lock
cdef list[FastRLock] _waiters
def __cinit__(self, lock=None):
if lock is None:
lock = FastRLock()
self._lock = lock
self._waiters = list[FastRLock]()
def __dealloc__(self):
self._waiters.empty()
def acquire(self, bint blocking=True):
self._lock.acquire(blocking)
def release(self):
self._lock.release()
def __enter__(self):
return self._lock.__enter__()
def __exit__(self, *args):
return self._lock.__exit__(*args)
def __repr__(self):
return "<Condition(%s, %d)>" % (self._lock, self._waiters.size())
def _release_save(self):
return self._lock.release() # No state to save
def _acquire_restore(self, x):
return self._lock.acquire() # Ignore saved state
def _is_owned(self):
return self._lock._is_owned()
def wait(self, timeout=None):
cdef double delay,endtime,remaining,ctimeout
if timeout is None:
ctimeout = 0
else:
ctimeout = <double>timeout
if not self._is_owned():
raise RuntimeError("cannot wait on un-acquired lock")
cdef FastRLock waiter = FastRLock()
waiter.acquire()
self._waiters.push_back(waiter)
saved_state = self._release_save()
try: # restore state no matter what (e.g., KeyboardInterrupt)
if ctimeout == 0:
waiter.acquire()
else:
# Balancing act: We can't afford a pure busy loop, so we
# have to sleep; but if we sleep the whole timeout time,
# we'll be unresponsive. The scheme here sleeps very
# little at first, longer as time goes on, but never longer
# than 20 times per second (or the timeout time remaining).
endtime = _time() + ctimeout
delay = 0.0005 # 500 us -> initial delay of 1 ms
while True:
gotit = waiter.acquire(0)
if gotit:
break
remaining = endtime - _time()
if remaining <= 0:
break
delay = min(delay * 2., remaining, .05)
_sleep(delay)
if not gotit:
print("%s.wait(%s): timed out", self, ctimeout)
try:
self._waiters.remove(waiter)
except ValueError:
pass
finally:
self._acquire_restore(saved_state)
def notify(self, n=1):
if not self._is_owned():
raise RuntimeError("cannot notify on un-acquired lock")
self._waiters
if self.waiters.size()==0:
return
for i in range(self.waiters.size()):
if i>=n:
break
waiter=self.waiters[i]
waiter.release()
try:
self.waiters.remove(waiter)
except ValueError:
pass
def notify_all(self):
self.notify(self._waiters.size())
notifyAll = notify_all
cdef class Semaphore:
cdef int _value
cdef Condition _cond
def __cinit__(self, int value=1):
if value < 0:
raise ValueError("semaphore initial value must be >= 0")
self._cond = Condition(FastRLock())
self._value = value
def acquire(self, blocking=True):
rc = False
self._cond.acquire()
while self._value == 0:
if not blocking:
break
self._cond.wait()
else:
self._value = self._value - 1
rc = True
self._cond.release()
return rc
__enter__ = acquire
def release(self):
self._cond.acquire()
self._value = self._value + 1
self._cond.notify()
self._cond.release()
def __exit__(self, t, v, tb):
self.release()