Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improvements for gzip sending #278

Closed
wants to merge 14 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
211 changes: 191 additions & 20 deletions devo/sender/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from enum import Enum
from pathlib import Path
from ssl import SSLWantReadError, SSLWantWriteError
from threading import Thread, Lock, Event
from typing import Optional, Callable

import pem
from _socket import SHUT_WR
Expand Down Expand Up @@ -85,10 +87,15 @@ def __init__(self, message: str):
:param message: Message describing the exception. It will be also
used as `args` attribute in `Exception` class
"""
if not isinstance(message, str):
raise TypeError(f'must be str, not {type(message).__name__}')
self.message: str = message
"""Message describing exception"""
super().__init__(self.message)

def __str__(self):
return self.message


class SenderConfigSSL:
"""
Expand Down Expand Up @@ -315,14 +322,140 @@ def __init__(self, address=None):
raise DevoSenderException(ERROR_MSGS.CANT_CREATE_TCP_CONFIG % str(error)) from error


class SenderBufferFlusher(Thread):
""" Thread class for flushing buffer if the timeout is reached.

When the "events" value from the SenderBuffer is set to 0, the wait will be set to "None" that means
that the "wait_object" will wait forever (until it will be "wake up").

When the "events" value from the SenderBuffer is set to greater than 0, the wait it will be set
to "1.0" that means that the "wait_object" will wait only 1 second.
This will check every second to see if the timeout has been reached.

"""

DEFAULT_INTERNAL_WAIT_VALUE = 1.0

def __init__(self):
super().__init__()
self.buffer_timeout: float = 10.0
self.flush_buffer_func = None
self.__first_data_timestamp: Optional[float] = None
self.__running_flag = True
self.__wait_object: Event = Event()
self.__loop_wait = SenderBufferFlusher.DEFAULT_INTERNAL_WAIT_VALUE

def start(self) -> None:

# "buffer_timeout" and "flush_buffer_func" must have valid values
if not self.buffer_timeout or self.buffer_timeout <= 0.0:
raise DevoSenderException('"buffer_timeout" is required and must have a value grater than 0.0')
if not self.flush_buffer_func:
raise DevoSenderException('"flush_buffer_func" is required')
super().start()

def run(self):
while self.__running_flag:
if (self.__first_data_timestamp is not None
and ((time.time() - self.__first_data_timestamp) >= self.buffer_timeout)):
self.flush_buffer_func()
self.__first_data_timestamp = None

# This "wait_object" can be interrupted is "set()" method is called
called = self.__wait_object.wait(timeout=self.__loop_wait)
if called:
self.__wait_object.clear()

def initialize_timestamp(self) -> float:
""" This method should be called every time the buffer transits from "0" to "greater than 0"

:return: Time mark that will be used as reference for flushing the buffer (most of the time it will not be used)
"""
self.__loop_wait = SenderBufferFlusher.DEFAULT_INTERNAL_WAIT_VALUE
self.__first_data_timestamp = time.time()
if not self.__wait_object.is_set():
self.__wait_object.set()

return self.__first_data_timestamp

def stop(self):
""" Interrupts completely the Thread execution.

:return:
"""
self.__running_flag = False
if not self.__wait_object.is_set():
self.__wait_object.set()

def wait(self) -> None:
""" Sets the thread in a "forever" wait status.

:return:
"""

self.__loop_wait = None
self.__first_data_timestamp = None
if not self.__wait_object.is_set():
self.__wait_object.set()


class SenderBuffer:
"""Micro class for buffer values"""

def __init__(self):
self.length = 19500
self.compression_level = -1
self.text_buffer = b""
self.events = 0
self.length: int = 19500
self.compression_level: int = -1
self.text_buffer: bytes = b""
self.__events: int = 0
self.__buffer_flusher = SenderBufferFlusher()
self.__buffer_flusher_is_started: bool = False
self.use_buffer_flusher: bool = False

@property
def events(self) -> int:
return self.__events

@events.setter
def events(self, number_of_events):
""" Setter method for events

This method allows to intercept the values of the "events" attribute for being able
to "trigger" or "pause" the thread, this way the processing time would be optimal.

:param number_of_events:
:return:
"""
if self.use_buffer_flusher:
if not self.__buffer_flusher_is_started:
self.__buffer_flusher_is_started = True
self.__buffer_flusher.start()

if self.__events == 0 and number_of_events > 0:
self.__buffer_flusher.initialize_timestamp()

elif number_of_events == 0:
self.__buffer_flusher.wait()

self.__events = number_of_events

@property
def buffer_flusher_func(self) -> Callable:
return self.__buffer_flusher.flush_buffer_func

@buffer_flusher_func.setter
def buffer_flusher_func(self, flusher_func: Callable):
self.__buffer_flusher.flush_buffer_func = flusher_func

@property
def buffer_timeout(self) -> float:
return self.__buffer_flusher.buffer_timeout

@buffer_timeout.setter
def buffer_timeout(self, timeout: float):
self.__buffer_flusher.buffer_timeout = timeout

def close(self) -> None:
self.__buffer_flusher.stop()


class Sender(logging.Handler):
Expand All @@ -347,6 +480,8 @@ def __init__(
timeout=30,
debug=False,
logger=None,
buffer_timeout: float = 10.0,
use_buffer_flusher: bool = False
):
if config is None:
raise DevoSenderException(ERROR_MSGS.PROBLEMS_WITH_SENDER_ARGS)
Expand All @@ -359,7 +494,11 @@ def __init__(
self.socket_max_connection = 3600 * 1000
self.last_message = int(time.time())
self.buffer = SenderBuffer()
self.buffer.buffer_timeout = buffer_timeout
self.buffer.buffer_flusher_func = self.flush_buffer
self.buffer.use_buffer_flusher = use_buffer_flusher
self.logging = {}
self.buffer_lock = Lock()

self.timestart = time.time()
if isinstance(config, (dict, Configuration)):
Expand Down Expand Up @@ -452,6 +591,12 @@ def __connect_ssl(self):
and self._sender_config.cert is not None
):
context = ssl.create_default_context(cafile=self._sender_config.chain)
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
context.minimum_version = ssl.TLSVersion.TLSv1_2
context.maximum_version = ssl.TLSVersion.TLSv1_3

if self._sender_config.sec_level is not None:
context.set_ciphers(
Expand All @@ -462,7 +607,6 @@ def __connect_ssl(self):

if self._sender_config.verify_mode is not None:
context.verify_mode = self._sender_config.verify_mode

context.load_cert_chain(
keyfile=self._sender_config.key, certfile=self._sender_config.cert
)
Expand Down Expand Up @@ -601,6 +745,7 @@ def close(self):
"""
Forces socket closure
"""
self.buffer.close()
if self.socket is not None:
try:
self.socket.shutdown(SHUT_WR)
Expand Down Expand Up @@ -889,8 +1034,9 @@ def fill_buffer(self, msg):
if msg[-1:] != b"\n":
msg += b"\n"

self.buffer.text_buffer += msg
self.buffer.events += 1
with self.buffer_lock:
self.buffer.text_buffer += msg
self.buffer.events += 1
if len(self.buffer.text_buffer) > self.buffer.length:
return self.flush_buffer()
return 0
Expand All @@ -900,19 +1046,44 @@ def flush_buffer(self):
Method for flush-send buffer, its zipped and sent now
:return: None
"""
if self.buffer.text_buffer:
try:
compressor = zlib.compressobj(self.buffer.compression_level, zlib.DEFLATED, 31)
record = compressor.compress(self.buffer.text_buffer) + compressor.flush()
if self.send_raw(record, zip=True):
return self.buffer.events
return 0
except Exception as error:
raise DevoSenderException(ERROR_MSGS.FLUSHING_BUFFER_ERROR) from error
finally:
self.buffer.text_buffer = b""
self.buffer.events = 0
return 0
with self.buffer_lock:
if self.buffer.text_buffer:
try:
compressor = zlib.compressobj(self.buffer.compression_level, zlib.DEFLATED, 31)
record = compressor.compress(self.buffer.text_buffer) + compressor.flush()
if self.send_raw(record, zip=True):
return self.buffer.events
return 0
except Exception as error:
raise DevoSenderException(ERROR_MSGS.FLUSHING_BUFFER_ERROR) from error
finally:
self.buffer.text_buffer = b""
self.buffer.events = 0
return 0

def get_buffer_info(self) -> dict:
"""
Getter method for the buffer.
Useful for emergency situations when we can't send
:return: dict with "events" and "text_buffer" values
"""
with self.buffer_lock:
return {
"events": self.buffer.events,
"text_buffer": self.buffer.text_buffer,
}

def set_buffer_info(self, text_buffer: bytes, num_events: int) -> None:
"""
Setter method for the buffer.
Useful for emergency situations when we can't send
:param: text_buffer (bytes)
:param: num_events (int)
:return:
"""
with self.buffer_lock:
self.buffer.text_buffer = text_buffer
self.buffer.events = num_events

@staticmethod
def for_logging(config=None, con_type=None, tag=None, level=None):
Expand Down
Loading