forked from Lost-Ark-Market-Online/LostArk-Market-Watcher
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_global_log.py
149 lines (118 loc) · 5.12 KB
/
test_global_log.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
from concurrent.futures import ThreadPoolExecutor, wait
from datetime import datetime
import sys
from time import sleep
from modules.common.singleton import Singleton
from modules.config import Config
from PySide6.QtWidgets import QApplication
from PySide6.QtCore import Signal, QObject
import logging
import os
DATEFMT = '%Y-%m-%dT%H:%M:%S'
LOG_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOG_FILE_INFO = f'logs/{datetime.now().strftime("%m-%d-%Y")}.log'
LOG_FILE_ERROR = f'logs/{datetime.now().strftime("%m-%d-%Y")}.err'
class SignalHandler(logging.Handler, QObject):
signal = Signal(str)
def __init__(self, *args, **kwargs):
logging.Handler.__init__(self, *args, **kwargs)
QObject.__init__(self, *args, **kwargs)
def emit(self, record):
print(record.getMessage())
self.signal.emit(record.getMessage())
class AppLogger(metaclass=Singleton):
logger: logging.Logger = None
file_handler_info: logging.FileHandler = None
file_handler_error: logging.FileHandler = None
signal_handler_info: SignalHandler = None
signal_handler_error: SignalHandler = None
log_formatter: logging.Formatter = logging.Formatter(
datefmt=DATEFMT, fmt=LOG_FORMAT)
def __init__(self) -> None:
self.logger = logging.getLogger('LostArkMarketWatcher')
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(self.log_formatter)
self.logger.addHandler(stream_handler)
self.logger.setLevel(logging.INFO)
self.info = self.logger.info
self.warning = self.logger.warning
self.error = self.logger.error
self.exception = self.logger.exception
if Config().save_log:
if os.path.exists("logs") == False:
os.mkdir('logs')
self.file_enable()
def file_enable(self):
if self.file_handler_info is None:
self.file_handler_info = logging.FileHandler(
LOG_FILE_INFO, mode='a')
self.file_handler_info.setFormatter(self.log_formatter)
self.file_handler_info.setLevel(logging.INFO)
self.logger.addHandler(self.file_handler_info)
if self.file_handler_error is None:
self.file_handler_error = logging.FileHandler(
LOG_FILE_ERROR, mode='a')
self.file_handler_error.setFormatter(self.log_formatter)
self.file_handler_error.setLevel(logging.ERROR)
self.logger.addHandler(self.file_handler_error)
def file_disable(self):
if self.file_handler_info:
self.logger.removeHandler(self.file_handler_info)
self.file_handler_info = None
if self.file_handler_error:
self.logger.removeHandler(self.file_handler_error)
self.file_handler_error = None
def signal_enable(self):
if self.signal_handler_info is None:
self.signal_handler_info = SignalHandler()
self.signal_handler_info.setFormatter(self.log_formatter)
self.signal_handler_info.setLevel(logging.INFO)
self.logger.addHandler(self.signal_handler_info)
if self.file_handler_error is None:
self.file_handler_error = SignalHandler()
self.file_handler_error.setFormatter(self.log_formatter)
self.file_handler_error.setLevel(logging.ERROR)
self.logger.addHandler(self.file_handler_error)
def signal_disable(self):
if self.signal_handler_info:
self.logger.removeHandler(self.signal_handler_info)
self.signal_handler_info = None
if self.signal_handler_error:
self.logger.removeHandler(self.signal_handler_error)
self.signal_handler_error = None
def test():
sleep(1)
AppLogger().info('This is an INFO message inside a Thread')
class TestApp(QApplication):
def __init__(self, *args, **kwargs):
QApplication.__init__(self, *args, **kwargs)
AppLogger().signal_enable()
AppLogger().signal_handler_info.signal.connect(self.test_signal)
def test_signal(self, message):
print(f"PRINT: {message}")
if __name__ == '__main__':
AppLogger()
try:
app = TestApp()
test_futures = []
executor = ThreadPoolExecutor(
max_workers=5)
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
test_futures.append(executor.submit(test))
wait(test_futures)
except Exception as e:
AppLogger().exception(e)