diff --git a/devo/sender/data.py b/devo/sender/data.py index 58f7f27..f72ae80 100644 --- a/devo/sender/data.py +++ b/devo/sender/data.py @@ -13,6 +13,8 @@ from enum import Enum from pathlib import Path from ssl import SSLWantReadError, SSLWantWriteError +from threading import Thread, Lock, Event +from typing import Optional, Callable import pem from _socket import SHUT_WR @@ -85,10 +87,15 @@ def __init__(self, message: str): :param message: Message describing the exception. It will be also used as `args` attribute in `Exception` class """ + if not isinstance(message, str): + raise TypeError(f'must be str, not {type(message).__name__}') self.message: str = message """Message describing exception""" super().__init__(self.message) + def __str__(self): + return self.message + class SenderConfigSSL: """ @@ -315,14 +322,140 @@ def __init__(self, address=None): raise DevoSenderException(ERROR_MSGS.CANT_CREATE_TCP_CONFIG % str(error)) from error +class SenderBufferFlusher(Thread): + """ Thread class for flushing buffer if the timeout is reached. + + When the "events" value from the SenderBuffer is set to 0, the wait will be set to "None" that means + that the "wait_object" will wait forever (until it will be "wake up"). + + When the "events" value from the SenderBuffer is set to greater than 0, the wait it will be set + to "1.0" that means that the "wait_object" will wait only 1 second. + This will check every second to see if the timeout has been reached. + + """ + + DEFAULT_INTERNAL_WAIT_VALUE = 1.0 + + def __init__(self): + super().__init__() + self.buffer_timeout: float = 10.0 + self.flush_buffer_func = None + self.__first_data_timestamp: Optional[float] = None + self.__running_flag = True + self.__wait_object: Event = Event() + self.__loop_wait = SenderBufferFlusher.DEFAULT_INTERNAL_WAIT_VALUE + + def start(self) -> None: + + # "buffer_timeout" and "flush_buffer_func" must have valid values + if not self.buffer_timeout or self.buffer_timeout <= 0.0: + raise DevoSenderException('"buffer_timeout" is required and must have a value grater than 0.0') + if not self.flush_buffer_func: + raise DevoSenderException('"flush_buffer_func" is required') + super().start() + + def run(self): + while self.__running_flag: + if (self.__first_data_timestamp is not None + and ((time.time() - self.__first_data_timestamp) >= self.buffer_timeout)): + self.flush_buffer_func() + self.__first_data_timestamp = None + + # This "wait_object" can be interrupted is "set()" method is called + called = self.__wait_object.wait(timeout=self.__loop_wait) + if called: + self.__wait_object.clear() + + def initialize_timestamp(self) -> float: + """ This method should be called every time the buffer transits from "0" to "greater than 0" + + :return: Time mark that will be used as reference for flushing the buffer (most of the time it will not be used) + """ + self.__loop_wait = SenderBufferFlusher.DEFAULT_INTERNAL_WAIT_VALUE + self.__first_data_timestamp = time.time() + if not self.__wait_object.is_set(): + self.__wait_object.set() + + return self.__first_data_timestamp + + def stop(self): + """ Interrupts completely the Thread execution. + + :return: + """ + self.__running_flag = False + if not self.__wait_object.is_set(): + self.__wait_object.set() + + def wait(self) -> None: + """ Sets the thread in a "forever" wait status. + + :return: + """ + + self.__loop_wait = None + self.__first_data_timestamp = None + if not self.__wait_object.is_set(): + self.__wait_object.set() + + class SenderBuffer: """Micro class for buffer values""" def __init__(self): - self.length = 19500 - self.compression_level = -1 - self.text_buffer = b"" - self.events = 0 + self.length: int = 19500 + self.compression_level: int = -1 + self.text_buffer: bytes = b"" + self.__events: int = 0 + self.__buffer_flusher = SenderBufferFlusher() + self.__buffer_flusher_is_started: bool = False + self.use_buffer_flusher: bool = False + + @property + def events(self) -> int: + return self.__events + + @events.setter + def events(self, number_of_events): + """ Setter method for events + + This method allows to intercept the values of the "events" attribute for being able + to "trigger" or "pause" the thread, this way the processing time would be optimal. + + :param number_of_events: + :return: + """ + if self.use_buffer_flusher: + if not self.__buffer_flusher_is_started: + self.__buffer_flusher_is_started = True + self.__buffer_flusher.start() + + if self.__events == 0 and number_of_events > 0: + self.__buffer_flusher.initialize_timestamp() + + elif number_of_events == 0: + self.__buffer_flusher.wait() + + self.__events = number_of_events + + @property + def buffer_flusher_func(self) -> Callable: + return self.__buffer_flusher.flush_buffer_func + + @buffer_flusher_func.setter + def buffer_flusher_func(self, flusher_func: Callable): + self.__buffer_flusher.flush_buffer_func = flusher_func + + @property + def buffer_timeout(self) -> float: + return self.__buffer_flusher.buffer_timeout + + @buffer_timeout.setter + def buffer_timeout(self, timeout: float): + self.__buffer_flusher.buffer_timeout = timeout + + def close(self) -> None: + self.__buffer_flusher.stop() class Sender(logging.Handler): @@ -347,6 +480,8 @@ def __init__( timeout=30, debug=False, logger=None, + buffer_timeout: float = 10.0, + use_buffer_flusher: bool = False ): if config is None: raise DevoSenderException(ERROR_MSGS.PROBLEMS_WITH_SENDER_ARGS) @@ -359,7 +494,11 @@ def __init__( self.socket_max_connection = 3600 * 1000 self.last_message = int(time.time()) self.buffer = SenderBuffer() + self.buffer.buffer_timeout = buffer_timeout + self.buffer.buffer_flusher_func = self.flush_buffer + self.buffer.use_buffer_flusher = use_buffer_flusher self.logging = {} + self.buffer_lock = Lock() self.timestart = time.time() if isinstance(config, (dict, Configuration)): @@ -452,6 +591,12 @@ def __connect_ssl(self): and self._sender_config.cert is not None ): context = ssl.create_default_context(cafile=self._sender_config.chain) + context.options |= ssl.OP_NO_SSLv2 + context.options |= ssl.OP_NO_SSLv3 + context.options |= ssl.OP_NO_TLSv1 + context.options |= ssl.OP_NO_TLSv1_1 + context.minimum_version = ssl.TLSVersion.TLSv1_2 + context.maximum_version = ssl.TLSVersion.TLSv1_3 if self._sender_config.sec_level is not None: context.set_ciphers( @@ -462,7 +607,6 @@ def __connect_ssl(self): if self._sender_config.verify_mode is not None: context.verify_mode = self._sender_config.verify_mode - context.load_cert_chain( keyfile=self._sender_config.key, certfile=self._sender_config.cert ) @@ -601,6 +745,7 @@ def close(self): """ Forces socket closure """ + self.buffer.close() if self.socket is not None: try: self.socket.shutdown(SHUT_WR) @@ -889,8 +1034,9 @@ def fill_buffer(self, msg): if msg[-1:] != b"\n": msg += b"\n" - self.buffer.text_buffer += msg - self.buffer.events += 1 + with self.buffer_lock: + self.buffer.text_buffer += msg + self.buffer.events += 1 if len(self.buffer.text_buffer) > self.buffer.length: return self.flush_buffer() return 0 @@ -900,19 +1046,44 @@ def flush_buffer(self): Method for flush-send buffer, its zipped and sent now :return: None """ - if self.buffer.text_buffer: - try: - compressor = zlib.compressobj(self.buffer.compression_level, zlib.DEFLATED, 31) - record = compressor.compress(self.buffer.text_buffer) + compressor.flush() - if self.send_raw(record, zip=True): - return self.buffer.events - return 0 - except Exception as error: - raise DevoSenderException(ERROR_MSGS.FLUSHING_BUFFER_ERROR) from error - finally: - self.buffer.text_buffer = b"" - self.buffer.events = 0 - return 0 + with self.buffer_lock: + if self.buffer.text_buffer: + try: + compressor = zlib.compressobj(self.buffer.compression_level, zlib.DEFLATED, 31) + record = compressor.compress(self.buffer.text_buffer) + compressor.flush() + if self.send_raw(record, zip=True): + return self.buffer.events + return 0 + except Exception as error: + raise DevoSenderException(ERROR_MSGS.FLUSHING_BUFFER_ERROR) from error + finally: + self.buffer.text_buffer = b"" + self.buffer.events = 0 + return 0 + + def get_buffer_info(self) -> dict: + """ + Getter method for the buffer. + Useful for emergency situations when we can't send + :return: dict with "events" and "text_buffer" values + """ + with self.buffer_lock: + return { + "events": self.buffer.events, + "text_buffer": self.buffer.text_buffer, + } + + def set_buffer_info(self, text_buffer: bytes, num_events: int) -> None: + """ + Setter method for the buffer. + Useful for emergency situations when we can't send + :param: text_buffer (bytes) + :param: num_events (int) + :return: + """ + with self.buffer_lock: + self.buffer.text_buffer = text_buffer + self.buffer.events = num_events @staticmethod def for_logging(config=None, con_type=None, tag=None, level=None):