-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathBlinkLogger.py
347 lines (292 loc) · 13.3 KB
/
BlinkLogger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
# Copyright (C) 2009-2011 AG Projects. See LICENSE for details.
#
import datetime
import os
import sys
from AppKit import NSApp
from application import log
from application.notification import IObserver, NotificationCenter
from application.python.queue import EventQueue
from application.python.types import Singleton
from application.system import makedirs
from sipsimple.configuration.settings import SIPSimpleSettings
from zope.interface import implementer
from pprint import pformat
class BlinkLogger(object, metaclass=Singleton):
def __init__(self):
self.gui_backlog = []
self.gui_logger = self.backlog_keeper
@property
def app_delegate(self):
try:
return self.__dict__['app_delegate']
except KeyError:
return self.__dict__.setdefault('app_delegate', NSApp.delegate())
def backlog_keeper(self, text):
self.gui_backlog.append(text)
def set_gui_logger(self, logger):
self.gui_logger = logger
for line in self.gui_backlog:
self.gui_logger(line)
self.gui_backlog = []
def log_error(self, message):
print(message)
self.gui_logger("Error: "+message)
def log_warning(self, message):
print(message)
self.gui_logger("Warning: "+message)
def log_info(self, message):
print(message)
self.gui_logger(message)
def log_debug(self, message):
if self.app_delegate.debug: # todo: log_debug is called 13-14 times before this flag is initialized from the configuration, meaning they can never be displayed
print(message)
self.gui_logger(message)
def get_status_messages(self):
return self.messages
@implementer(IObserver)
class FileLogger(object, metaclass=Singleton):
# public methods
#
def __init__(self, msrp_level=log.level.INFO):
self.msrp_level = msrp_level
self._siptrace_filename = None
self._siptrace_file = None
self._siptrace_error = False
self._siptrace_start_time = None
self._siptrace_packet_count = 0
self._msrptrace_filename = None
self._msrptrace_file = None
self._msrptrace_error = False
self._pjsiptrace_filename = None
self._pjsiptrace_file = None
self._pjsiptrace_error = False
self._notifications_filename = None
self._notifications_file = None
self._notifications_error = False
self._log_directory_error = False
self._event_queue = None
def start(self):
# try to create the log directory
try:
self._init_log_directory()
except Exception:
pass
# register to receive log notifications
notification_center = NotificationCenter()
notification_center.add_observer(self)
# start the thread processing the notifications
self._event_queue = EventQueue(handler=self._process_notification, name='Log handling')
self._event_queue.start()
def stop(self):
# stop the thread processing the notifications
self._event_queue.stop()
self._event_queue.join()
# close sip trace file
if self._siptrace_file is not None:
self._siptrace_file.close()
self._siptrace_file = None
# close msrp trace file
if self._msrptrace_file is not None:
self._msrptrace_file.close()
self._msrptrace_file = None
# close pjsip trace file
if self._pjsiptrace_file is not None:
self._pjsiptrace_file.close()
self._pjsiptrace_file = None
# close notifications trace file
if self._notifications_file is not None:
self._notifications_file.close()
self._notifications_file = None
# unregister from receiving notifications
notification_center = NotificationCenter()
notification_center.remove_observer(self)
def handle_notification(self, notification):
self._event_queue.put(notification)
def _process_notification(self, notification):
settings = SIPSimpleSettings()
handler = getattr(self, '_NH_%s' % notification.name, None)
if handler is not None:
handler(notification)
handler = getattr(self, '_LH_%s' % notification.name, None)
if handler is not None:
handler(notification)
if notification.name not in ('SIPEngineLog', 'SIPEngineSIPTrace') and settings.logs.trace_notifications and settings.logs.trace_notifications_to_file:
try:
self._init_log_file('notifications')
except Exception:
pass
else:
message = 'Notification name=%s sender=%s data=%s' % (notification.name, notification.sender, pformat(notification.data))
try:
self._notifications_file.write('%s: %s\n' % (datetime.datetime.now(), message))
self._notifications_file.flush()
except Exception as e:
pass
#print('Debug write message failed: %s' % message)
# notification handlers
#
def _NH_CFGSettingsObjectDidChange(self, notification):
settings = SIPSimpleSettings()
if notification.sender is settings:
if 'logs.directory' in notification.data.modified:
# sip trace
if self._siptrace_file is not None:
self._siptrace_file.close()
self._siptrace_file = None
# pjsip trace
if self._pjsiptrace_file is not None:
self._pjsiptrace_file.close()
self._pjsiptrace_file = None
# notifications trace
if self._notifications_file is not None:
self._notifications_file.close()
self._notifications_file = None
# try to create the log directory
try:
self._init_log_directory()
except Exception:
pass
# log handlers
#
def _LH_SIPEngineSIPTrace(self, notification):
settings = SIPSimpleSettings()
if not settings.logs.trace_sip or not settings.logs.trace_sip_to_file:
return
if self._siptrace_start_time is None:
self._siptrace_start_time = notification.datetime
self._siptrace_packet_count += 1
if notification.data.received:
direction = "RECEIVED"
else:
direction = "SENDING"
buf = ["%s: Packet %d, +%s" % (direction, self._siptrace_packet_count, (notification.datetime - self._siptrace_start_time))]
buf.append("%(source_ip)s:%(source_port)d -(SIP over %(transport)s)-> %(destination_ip)s:%(destination_port)d" % notification.data.__dict__)
data = notification.data.data.decode() if isinstance(notification.data.data, bytes) else notification.data.data
buf.append(data)
buf.append('--')
message = '\n'.join(buf)
try:
self._init_log_file('siptrace')
except Exception:
pass
else:
self._siptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message))
self._siptrace_file.flush()
def _LH_SIPEngineLog(self, notification):
settings = SIPSimpleSettings()
if not settings.logs.trace_pjsip or not settings.logs.trace_pjsip_to_file:
return
message = "(%(level)d) %(message)s" % notification.data.__dict__
try:
self._init_log_file('pjsiptrace')
except Exception:
pass
else:
self._pjsiptrace_file.write('[%s %d] %s\n' % (os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message))
self._pjsiptrace_file.flush()
def _LH_DNSLookupTrace(self, notification):
settings = SIPSimpleSettings()
if not settings.logs.trace_sip or not settings.logs.trace_sip_to_file:
return
message = 'DNS lookup %(query_type)s %(query_name)s' % notification.data.__dict__
if notification.data.error is None:
message += ' succeeded, ttl=%d: ' % notification.data.answer.ttl
if notification.data.query_type == 'A':
message += ", ".join(record.address for record in notification.data.answer)
elif notification.data.query_type == 'SRV':
message += ", ".join('%d %d %d %s' % (record.priority, record.weight, record.port, record.target) for record in notification.data.answer)
elif notification.data.query_type == 'NAPTR':
message += ", ".join('%d %d "%s" "%s" "%s" %s' % (record.order, record.preference, record.flags, record.service, record.regexp, record.replacement) for record in notification.data.answer)
else:
import dns.resolver
message_map = {dns.resolver.NXDOMAIN: 'DNS record does not exist',
dns.resolver.NoAnswer: 'DNS response contains no answer',
dns.resolver.NoNameservers: 'no DNS name servers could be reached',
dns.resolver.Timeout: 'no DNS response received, the query has timed out'}
message += ' failed: %s' % message_map.get(notification.data.error.__class__, '')
try:
self._init_log_file('siptrace')
except Exception:
pass
else:
self._siptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message))
self._siptrace_file.flush()
def _LH_MSRPTransportTrace(self, notification):
settings = SIPSimpleSettings()
if not settings.logs.trace_msrp or not settings.logs.trace_msrp_to_file:
return
arrow = {'incoming': '<--', 'outgoing': '-->'}[notification.data.direction]
local_address = notification.sender.getHost()
local_address = '%s:%d' % (local_address.host, local_address.port)
remote_address = notification.sender.getPeer()
remote_address = '%s:%d' % (remote_address.host, remote_address.port)
data = notification.data.data.decode() if isinstance(notification.data.data, bytes) else notification.data.data
message = '%s %s %s\n' % (local_address, arrow, remote_address) + data
try:
self._init_log_file('msrptrace')
except Exception:
pass
else:
self._msrptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message))
self._msrptrace_file.flush()
def _LH_MSRPLibraryLog(self, notification):
settings = SIPSimpleSettings()
if not settings.logs.trace_msrp or not settings.logs.trace_msrp_to_file:
return
if notification.data.level < self.msrp_level:
return
message = '%s%s' % (notification.data.level, notification.data.message)
try:
self._init_log_file('msrptrace')
except Exception:
pass
else:
self._msrptrace_file.write('%s [%s %d]: %s\n' % (notification.datetime, os.path.basename(sys.argv[0]).rstrip('.py'), os.getpid(), message))
self._msrptrace_file.flush()
# private methods
#
def _init_log_directory(self):
settings = SIPSimpleSettings()
log_directory = settings.logs.directory.normalized
try:
makedirs(log_directory)
except Exception as e:
if not self._log_directory_error:
print("failed to create logs directory '%s': %s" % (log_directory, e))
self._log_directory_error = True
self._siptrace_error = True
self._pjsiptrace_error = True
self._notifications_error = True
raise
else:
self._log_directory_error = False
# sip trace
if self._siptrace_filename is None:
self._siptrace_filename = os.path.join(log_directory, 'sip_trace.txt')
self._siptrace_error = False
# msrp trace
if self._msrptrace_filename is None:
self._msrptrace_filename = os.path.join(log_directory, 'msrp_trace.txt')
self._msrptrace_error = False
# pjsip trace
if self._pjsiptrace_filename is None:
self._pjsiptrace_filename = os.path.join(log_directory, 'pjsip_trace.txt')
self._pjsiptrace_error = False
# notifications trace
if self._notifications_filename is None:
self._notifications_filename = os.path.join(log_directory, 'notifications_trace.txt')
self._notifications_error = False
def _init_log_file(self, type):
if getattr(self, '_%s_file' % type) is None:
self._init_log_directory()
filename = getattr(self, '_%s_filename' % type)
try:
setattr(self, '_%s_file' % type, open(filename, 'a'))
except Exception as e:
if not getattr(self, '_%s_error' % type):
print("failed to create log file '%s': %s" % (filename, e))
setattr(self, '_%s_error' % type, True)
raise
else:
setattr(self, '_%s_error' % type, False)