From d08895ad77c7edb894efa99361597a20eb46371b Mon Sep 17 00:00:00 2001 From: Timo Janssen Date: Mon, 25 Nov 2019 16:14:33 +0100 Subject: [PATCH] cleanup --- lib/socket/client.py | 133 ------------------------------------------ lib/socket/package.py | 54 ----------------- listener.py | 31 +++++----- 3 files changed, 15 insertions(+), 203 deletions(-) delete mode 100644 lib/socket/client.py delete mode 100644 lib/socket/package.py diff --git a/lib/socket/client.py b/lib/socket/client.py deleted file mode 100644 index 05db9d4..0000000 --- a/lib/socket/client.py +++ /dev/null @@ -1,133 +0,0 @@ -import asyncio -import datetime -import errno -import fcntl -import json -import os -import uuid - -from lib.socket.package import * -from version import VERSION - - -class Client: - - def __init__(self, loop, hostname, port, client_type, heartbeat_interval=5): - self.loop = loop - self._hostname = hostname - self._port = port - self._heartbeat_interval = heartbeat_interval - - self._client_type = client_type - self._id = uuid.uuid4().hex - self._messages = {} - self._current_message_id = 1 - self._current_message_id_locked = False - - self._last_heartbeat_send = datetime.datetime.now() - self._updates_on_heartbeat = [] - self._cbs = None - self._sock = None - - async def setup(self, cbs=None): - await self._connect() - - self._cbs = cbs - if cbs is None: - self._cbs = {} - - await self._handshake() - - async def _connect(self): - connected = False - while not connected: - print("Trying to connect") - try: - self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self._sock.connect((self._hostname, self._port)) - except Exception: - await asyncio.sleep(5) - else: - connected = True - print("Connected") - fcntl.fcntl(self._sock, fcntl.F_SETFL, os.O_NONBLOCK) - - async def run(self): - while 1: - if (datetime.datetime.now() - self._last_heartbeat_send).total_seconds() > int( - self._heartbeat_interval): - await self._send_heartbeat() - - await self._read_from_socket() - - async def close(self): - print('Close the socket') - self._sock.close() - - async def _read_from_socket(self): - try: - header = self._sock.recv(PACKET_HEADER_LEN) - except socket.error as e: - err = e.args[0] - if err == errno.EAGAIN or err == errno.EWOULDBLOCK: - await asyncio.sleep(1) - pass - else: - # a "real" error occurred - print(e) - # sys.exit(1) - else: - await self._read_message(header) - - async def _read_message(self, header): - packet_type, packet_id, data = await read_packet(self._sock, header) - - if packet_type == 0: - print("Connection lost, trying to reconnect") - try: - await self.setup(self._cbs) - except Exception as e: - print(e) - await asyncio.sleep(5) - elif packet_type == HANDSHAKE_OK: - print(f'Hands shaked with hub') - elif packet_type == HANDSHAKE_FAIL: - print(f'Hub does not want to shake hands') - elif packet_type == HEARTBEAT: - print(f'Heartbeat back from hub') - elif packet_type == REPONSE_OK: - print(f'Hub received update correctly') - elif packet_type == UNKNOW_CLIENT: - print(f'Hub does not recognize us') - await self._handshake() - else: - if packet_type in self._cbs.keys(): - await self._cbs.get(packet_type)(data) - else: - print(f'Message type not implemented: {packet_type}') - - async def _send_message(self, length, message_type, data): - if self._current_message_id_locked: - while self._current_message_id_locked: - await asyncio.sleep(0.1) - - self._current_message_id_locked = True - header = create_header(length, message_type, self._current_message_id) - self._current_message_id += 1 - self._current_message_id_locked = False - - self._sock.send(header + data) - - async def send_message(self, body, message_type): - await self._send_message(len(body), message_type, body) - - async def _handshake(self): - data = json.dumps({'client_id': str(self._id), 'client_type': self._client_type, 'version': VERSION}).encode('utf-8') - await self._send_message(len(data), HANDSHAKE, data) - self._last_heartbeat_send = datetime.datetime.now() - - async def _send_heartbeat(self): - print('Sending heartbeat to hub') - id_encoded = str(self._id).encode('utf-8') - await self._send_message(len(id_encoded), HEARTBEAT, id_encoded) - self._last_heartbeat_send = datetime.datetime.now() diff --git a/lib/socket/package.py b/lib/socket/package.py deleted file mode 100644 index d354cc6..0000000 --- a/lib/socket/package.py +++ /dev/null @@ -1,54 +0,0 @@ -import socket - -HANDSHAKE = 1 -UNKNOW_CLIENT = 2 -SHUTDOWN = 3 -ADD_SERIE = 4 -REMOVE_SERIE = 5 -HANDSHAKE_OK = 6 -HANDSHAKE_FAIL = 7 -HEARTBEAT = 8 -LISTENER_ADD_SERIE = 9 -LISTENER_REMOVE_SERIE = 10 -LISTENER_ADD_SERIE_COUNT = 11 -REPONSE_OK = 12 -UPDATE_SERIES = 13 -TRAIN_MODEL = 14 -FIT_MODEL = 15 -FORECAST_SERIE = 16 -WORKER_RESULT = 17 -WORKER_UPDATE_BUSY = 18 -PKL_UPDATE = 20 - -''' -Header: -size, int, 32bit -type, int 8bit -packetid, int 8bit - -total header length = 48 bits -''' - -PACKET_HEADER_LEN = 48 - - -async def read_packet(sock, header_data=None): - if isinstance(sock, socket.socket): - if header_data is None: - header_data = sock.recv(PACKET_HEADER_LEN) - body_size, packet_type, packet_id = read_header(header_data) - return packet_type, packet_id, sock.recv(body_size) - else: - if header_data is None: - header_data = await sock.read(PACKET_HEADER_LEN) - body_size, packet_type, packet_id = read_header(header_data) - return packet_type, packet_id, await sock.read(body_size) - - -def create_header(size, type, id): - return size.to_bytes(32, byteorder='big') + type.to_bytes(8, byteorder='big') + id.to_bytes(8, byteorder='big') - - -def read_header(binary_data): - return int.from_bytes(binary_data[:32], 'big'), int.from_bytes(binary_data[32:40], 'big'), int.from_bytes( - binary_data[40:48], 'big') diff --git a/listener.py b/listener.py index 66b8427..b4801b3 100644 --- a/listener.py +++ b/listener.py @@ -1,11 +1,10 @@ import asyncio import configparser import datetime -import json from lib.siridb.pipeserver import PipeServer -from lib.socket.client import Client -from lib.socket.package import * +from enodo.client import Client +from enodo.client.package import * class Listener: @@ -15,9 +14,9 @@ def __init__(self, loop, config_path): self._config = configparser.ConfigParser() self._config.read(config_path) self._series_to_watch = () - self._serie_counter_updates = {} + self._serie_updates = {} self._client = Client(loop, self._config['enodo']['hub_hostname'], int(self._config['enodo']['hub_port']), - 'listener', heartbeat_interval=int(self._config['enodo']['heartbeat_interval'])) + 'listener', self._config['enodo']['token'], heartbeat_interval=int(self._config['enodo']['heartbeat_interval'])) self._client_run_task = None self._updater_task = None self._last_update = datetime.datetime.now() @@ -43,26 +42,26 @@ async def _handle_pipe_data(self, data): print("INCOMMING DATA") for serie_name, values in data.items(): if serie_name in self._series_to_watch: - if serie_name in self._serie_counter_updates: - serie_counter = self._serie_counter_updates.get(serie_name) - self._serie_counter_updates[serie_name] = serie_counter + len(values) + print("h2") + if serie_name in self._serie_updates: + self._serie_updates[serie_name].extend(values) else: - self._serie_counter_updates[serie_name] = len(values) - - print(self._serie_counter_updates) + self._serie_updates[serie_name] = values async def _updater(self): while 1: if (datetime.datetime.now() - self._last_update).total_seconds() > int( - self._config['enodo']['counter_update_interval']) and len(self._serie_counter_updates.keys()): + self._config['enodo']['counter_update_interval']) and len(self._serie_updates.keys()): + print("HERE") await self._send_update() self._last_update = datetime.datetime.now() await asyncio.sleep(1) async def _send_update(self): - update_encoded = json.dumps(self._serie_counter_updates).encode('utf-8') - self._serie_counter_updates = {} - await self._client.send_message(update_encoded, LISTENER_ADD_SERIE_COUNT) + print("SENDING UPDATE") + update_encoded = self._serie_updates + await self._client.send_message(update_encoded, LISTENER_NEW_SERIES_POINTS) + self._serie_updates = {} async def start_listener(self): await self._start_siridb_pipeserver() @@ -74,7 +73,7 @@ async def start_listener(self): async def _handle_update_series(self, data): print("Received new list of series to watch") - self._series_to_watch = set(json.loads(data)) + self._series_to_watch = set(data) def close(self): self._client_run_task.cancel()