-
Notifications
You must be signed in to change notification settings - Fork 571
/
Copy pathclass_singleCleaner.py
187 lines (165 loc) · 7.54 KB
/
class_singleCleaner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
"""
The `singleCleaner` class is a timer-driven thread that cleans data structures
to free memory, resends messages when a remote node doesn't respond, and
sends pong messages to keep connections alive if the network isn't busy.
It cleans these data structures in memory:
- inventory (moves data to the on-disk sql database)
- inventorySets (clears then reloads data out of sql database)
It cleans these tables on the disk:
- inventory (clears expired objects)
- pubkeys (clears pubkeys older than 4 weeks old which we have not used
personally)
- knownNodes (clears addresses which have not been online for over 3 days)
It resends messages when there has been no response:
- resends getpubkey messages in 5 days (then 10 days, then 20 days, etc...)
- resends msg messages in 5 days (then 10 days, then 20 days, etc...)
"""
import gc
import os
import time
import queues
import state
from bmconfigparser import config
from helper_sql import sqlExecute, sqlQuery
from network import connectionpool, knownnodes, StoppableThread
from tr import _translate
#: Equals 4 weeks. You could make this longer if you want
#: but making it shorter would not be advisable because
#: there is a very small possibility that it could keep you
#: from obtaining a needed pubkey for a period of time.
lengthOfTimeToHoldOnToAllPubkeys = 2419200
class singleCleaner(StoppableThread):
"""The singleCleaner thread class"""
name = "singleCleaner"
cycleLength = 300
expireDiscoveredPeers = 300
def run(self): # pylint: disable=too-many-branches
gc.disable()
timeWeLastClearedInventoryAndPubkeysTables = 0
try:
state.maximumLengthOfTimeToBotherResendingMessages = (
config.getfloat(
'bitmessagesettings', 'stopresendingafterxdays')
* 24 * 60 * 60
) + (
config.getfloat(
'bitmessagesettings', 'stopresendingafterxmonths')
* (60 * 60 * 24 * 365) / 12)
except: # noqa:E722
# Either the user hasn't set stopresendingafterxdays and
# stopresendingafterxmonths yet or the options are missing
# from the config file.
state.maximumLengthOfTimeToBotherResendingMessages = float('inf')
while state.shutdown == 0:
self.stop.wait(self.cycleLength)
queues.UISignalQueue.put((
'updateStatusBar',
'Doing housekeeping (Flushing inventory in memory to disk...)'
))
state.Inventory.flush()
queues.UISignalQueue.put(('updateStatusBar', ''))
# If we are running as a daemon then we are going to fill up the UI
# queue which will never be handled by a UI. We should clear it to
# save memory.
# FIXME redundant?
if state.thisapp.daemon or not state.enableGUI:
queues.UISignalQueue.queue.clear()
tick = int(time.time())
if timeWeLastClearedInventoryAndPubkeysTables < tick - 7380:
timeWeLastClearedInventoryAndPubkeysTables = tick
state.Inventory.clean()
queues.workerQueue.put(('sendOnionPeerObj', ''))
# pubkeys
sqlExecute(
"DELETE FROM pubkeys WHERE time<? AND usedpersonally='no'",
tick - lengthOfTimeToHoldOnToAllPubkeys)
# Let us resend getpubkey objects if we have not yet heard
# a pubkey, and also msg objects if we have not yet heard
# an acknowledgement
queryreturn = sqlQuery(
"SELECT toaddress, ackdata, status FROM sent"
" WHERE ((status='awaitingpubkey' OR status='msgsent')"
" AND folder='sent' AND sleeptill<? AND senttime>?)",
tick,
tick - state.maximumLengthOfTimeToBotherResendingMessages
)
for toAddress, ackData, status in queryreturn:
if status == 'awaitingpubkey':
self.resendPubkeyRequest(toAddress)
elif status == 'msgsent':
self.resendMsg(ackData)
try:
# Cleanup knownnodes and handle possible severe exception
# while writing it to disk
if state.enableNetwork:
knownnodes.cleanupKnownNodes(connectionpool.pool)
except Exception as err:
if "Errno 28" in str(err):
self.logger.fatal(
'(while writing knownnodes to disk)'
' Alert: Your disk or data storage volume is full.'
)
queues.UISignalQueue.put((
'alert',
(_translate("MainWindow", "Disk full"),
_translate(
"MainWindow",
'Alert: Your disk or data storage volume'
' is full. Bitmessage will now exit.'),
True)
))
# FIXME redundant?
if state.thisapp.daemon or not state.enableGUI:
os._exit(1) # pylint: disable=protected-access
# inv/object tracking
for connection in connectionpool.pool.connections():
connection.clean()
# discovery tracking
exp = time.time() - singleCleaner.expireDiscoveredPeers
reaper = (k for k, v in state.discoveredPeers.items() if v < exp)
for k in reaper:
try:
del state.discoveredPeers[k]
except KeyError:
pass
# ..todo:: cleanup pending upload / download
gc.collect()
def resendPubkeyRequest(self, address):
"""Resend pubkey request for address"""
self.logger.debug(
'It has been a long time and we haven\'t heard a response to our'
' getpubkey request. Sending again.'
)
try:
# We need to take this entry out of the neededPubkeys structure
# because the queues.workerQueue checks to see whether the entry
# is already present and will not do the POW and send the message
# because it assumes that it has already done it recently.
del state.neededPubkeys[address]
except KeyError:
pass
except RuntimeError:
self.logger.warning(
"Can't remove %s from neededPubkeys, requesting pubkey will be delayed", address, exc_info=True)
queues.UISignalQueue.put((
'updateStatusBar',
'Doing work necessary to again attempt to request a public key...'
))
sqlExecute(
"UPDATE sent SET status = 'msgqueued'"
" WHERE toaddress = ? AND folder = 'sent'", address)
queues.workerQueue.put(('sendmessage', ''))
def resendMsg(self, ackdata):
"""Resend message by ackdata"""
self.logger.debug(
'It has been a long time and we haven\'t heard an acknowledgement'
' to our msg. Sending again.'
)
sqlExecute(
"UPDATE sent SET status = 'msgqueued'"
" WHERE ackdata = ? AND folder = 'sent'", ackdata)
queues.workerQueue.put(('sendmessage', ''))
queues.UISignalQueue.put((
'updateStatusBar',
'Doing work necessary to again attempt to deliver a message...'
))