-
Notifications
You must be signed in to change notification settings - Fork 1
/
osc_receiver.py
57 lines (48 loc) · 1.92 KB
/
osc_receiver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import liblo
import pickle
import threading
import traceback_printer
class OscReceiver(liblo.Server):
def __init__(self, port=None, proto=liblo.TCP, name=None):
if name:
self._name = name
else:
self._name = self.__class__.__name__
liblo.Server.__init__(self, port, proto)
self._running = False
self._freed = False
self._queue = []
self._lock = threading.Lock()
def add_method(self, path, typespec, callback_func, user_data=None):
liblo.Server.add_method(self, path, typespec, self._callback,
(callback_func, user_data))
def _callback(self, path, args, types, src, (callback_func, user_data)):
with self._lock:
self._queue.append((path, args, types, src, callback_func, user_data))
def start(self):
if self._freed:
raise Exception("Cannot call OscReceiver.start a second time. You need to create a new OscReceiver instance.")
self._running = True
serve_thread = threading.Thread(name="%s.server_thread" % self._name,
target=self._serve)
serve_thread.daemon = True
serve_thread.start()
def _serve(self):
while self._running:
self.recv()
self.free()
self._freed = True
def serve(self):
with self._lock:
for path, args, types, src, callback_func, user_data in self._queue:
self._fire_callback_with_exception_handler(
path, args, types, src, callback_func, user_data)
self._queue = []
def _fire_callback_with_exception_handler(self, path, args, types, src, callback, user_data):
try:
callback(path, args, types, src, user_data)
except Exception as err:
traceback_printer.print_traceback()
raise err
def stop(self):
self._running = False