-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathDbBase.py
168 lines (132 loc) · 3.75 KB
/
DbBase.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import traceback
import abc
from contextlib import contextmanager
import LogBase
import threading
import dbPool
import traceback
import statsd
import settings
class TransactionMixin(object, metaclass=abc.ABCMeta):
@contextmanager
def transaction(self, commit=True):
cursor = self.get_cursor()
if commit:
cursor.execute("BEGIN;")
try:
yield cursor
except Exception as e:
self.log.error("Error in transaction!")
for line in traceback.format_exc().split("\n"):
self.log.error(line)
if commit:
self.log.warn("Rolling back.")
cursor.execute("ROLLBACK;")
else:
self.log.warn("NOT Rolling back.")
raise e
finally:
if commit:
cursor.execute("COMMIT;")
self.release_cursor(cursor)
@contextmanager
def context_cursor(self):
cursor = self.get_cursor()
try:
yield cursor
finally:
self.release_cursor(cursor)
@abc.abstractmethod
def get_cursor(self):
return None
@abc.abstractmethod
def release_cursor(self, cursor):
return None
# Minimal class to handle opening a DB interface.
class DbBase(LogBase.LoggerMixin, TransactionMixin, metaclass=abc.ABCMeta):
# Abstract class (must be subclassed)
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def loggerPath(self):
return None
@abc.abstractmethod
def tableName(self):
return None
@abc.abstractmethod
def pluginName(self):
return None
def __del__(self):
for db_conn in self.db_connection_dict.values():
try:
db_conn.close()
except Exception:
pass
def __init__(self):
self.db_connection_dict = {}
super().__init__()
# self.mon_con = graphitesend.GraphitePickleClient(
# autoreconnect = True,
# group = None,
# prefix = 'MangaCMS.Scrapers.{tableName}.{pluginName}'.format(
# tableName = self.tableName.replace(".", "_"),
# pluginName = self.pluginName.replace(".", "_")
# ),
# system_name = '',
# graphite_server = settings.GRAPHITE_DB_IP,
# graphite_port = 2003,
# debug = True
# )
# self.mon_con.connect()
if settings.GRAPHITE_DB_IP:
self.mon_con = statsd.StatsClient(
host = settings.GRAPHITE_DB_IP,
port = 8125,
prefix = 'MangaCMS.Scrapers.{tableName}.{pluginName}'.format(
tableName = self.tableName.replace(".", "_"),
pluginName = self.pluginName.replace(".", "_"),
)
)
else:
self.mon_con = None
def __del__(self):
if hasattr(self, 'db_connection_dict'):
for conn in self.db_connection_dict:
dbPool.pool.putconn(conn)
@property
def __thread_cursor(self):
'''
__getCursor and __freeConn rely on "magic" thread ID cookies to associate
threads with their correct db pool interfaces
'''
tid = threading.get_ident()
if tid in self.db_connection_dict:
self.log.critical('Recursive access to singleton thread-specific resource!')
self.log.critical("Calling thread ID: '%s'", tid)
self.log.critical("Allocated handles")
for key, value in self.db_connection_dict.items():
self.log.critical(" '%s', '%s'", key, value)
raise ValueError("Recursive cursor retreival! What's going on?")
self.db_connection_dict[tid] = dbPool.pool.getconn()
return self.db_connection_dict[tid].cursor()
def __freeConn(self):
conn = self.db_connection_dict.pop(threading.get_ident())
dbPool.pool.putconn(conn)
def get_cursor(self):
return self.__thread_cursor
def release_cursor(self, cursor):
self.__freeConn()
class TestConn(DbBase):
loggerPath = "TestConn"
pluginName = "TestConn"
tableName = "testDb"
def test_mon_con():
c = TestConn()
print(c)
for newItems in range(5):
# newItems = 9
print("Doing send: ", c.mon_con, 'new_links', newItems)
res = c.mon_con.incr('new_links', 0)
print("Send return: ", res)
pass
if __name__ == "__main__":
test_mon_con()