-
Notifications
You must be signed in to change notification settings - Fork 2
/
utils.py
55 lines (44 loc) · 1.6 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# coding:utf-8
import queue
import weakref
import threading
class Singleton(type):
''' This class is meant to be used as a metaclass to transform a class into a singleton '''
_instances = weakref.WeakValueDictionary()
_singleton_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
with cls._singleton_lock:
if cls not in cls._instances:
instance = super(Singleton, cls).__call__(*args, **kwargs)
cls._instances[cls] = instance
return instance
return cls._instances[cls]
class RenewQueue(queue.Queue, object):
''' A queue which can contain only one element.
When an item is put into the queue it will replace the existing one if any. '''
def __init__(self):
super(RenewQueue, self).__init__(maxsize=1)
self.put_super = super(RenewQueue, self).put
self.get_super = super(RenewQueue, self).get
self._lock = threading.Lock()
def put(self, item):
with self._lock:
try:
self.put_super(item, False)
except queue.Full:
self.get_super(False)
self.put_super(item, False)
def get(self, block=True, timeout=None):
self._lock.acquire()
acquired = True
try:
return self.get_super(False, timeout)
except queue.Empty:
self._lock.release()
acquired = False
if not block:
raise
return self.get_super(True, timeout)
finally:
if acquired:
self._lock.release()