From 10aa04239c8cfd29ebfd87ed3b8f4657381ce86a Mon Sep 17 00:00:00 2001 From: timoj Date: Tue, 2 Mar 2021 23:23:22 +0100 Subject: [PATCH 1/2] cleanup, config, readme --- .gitignore | 3 ++ lib/config.py | 47 ++++++++++++++++++++++++ lib/siridb/pipeserver.py | 7 ++-- listener.py | 13 ++++--- main.py | 10 +++++- readme.md | 42 +++++++--------------- test/bytes.py | 14 -------- test/senddata.py | 29 --------------- test/server.py | 77 ---------------------------------------- version.py | 2 +- 10 files changed, 85 insertions(+), 159 deletions(-) create mode 100644 lib/config.py delete mode 100644 test/bytes.py delete mode 100644 test/senddata.py delete mode 100644 test/server.py diff --git a/.gitignore b/.gitignore index d82498e..529130a 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ listener_local.conf listener_docker.conf *.pyc +.vscode +.enodo_id +test \ No newline at end of file diff --git a/lib/config.py b/lib/config.py new file mode 100644 index 0000000..50d16f8 --- /dev/null +++ b/lib/config.py @@ -0,0 +1,47 @@ +import os +from configparser import RawConfigParser, _UNSET, SectionProxy, ConfigParser + +EMPTY_CONFIG_FILE = { + 'enodo': { + 'hub_hostname': '', + 'hub_port': '9103', + 'heartbeat_interval': '25', + 'pipe_path': '', + 'counter_update_interval': '10', + 'internal_security_token': '' + } +} + +def create_standard_config_file(path): + _config = ConfigParser() + + for section in EMPTY_CONFIG_FILE: + _config.add_section(section) + for option in EMPTY_CONFIG_FILE[section]: + _config.set(section, option, EMPTY_CONFIG_FILE[section][option]) + + with open(path, "w") as fh: + _config.write(fh) + +class EnodoConfigParser(RawConfigParser): + + def __getitem__(self, key): + if key != self.default_section and not self.has_section(key): + return SectionProxy(self, key) + return self._proxies[key] + + def has_option(self, section, option): + return True + + def get(self, section, option, *, raw=False, vars=None, fallback=_UNSET): + """Edited default get func from RawConfigParser + """ + env_value = os.getenv(option.upper()) + if env_value is not None: + return env_value + + try: + return super(EnodoConfigParser, self).get( + section, option, raw=False, vars=None, fallback=_UNSET) + except Exception as _: + raise Exception(f'Invalid config, missing option "{option}" in section "{section}" or environment variable "{option.upper()}"') \ No newline at end of file diff --git a/lib/siridb/pipeserver.py b/lib/siridb/pipeserver.py index 374d374..5e22560 100644 --- a/lib/siridb/pipeserver.py +++ b/lib/siridb/pipeserver.py @@ -22,5 +22,8 @@ def _on_data(self, data): """ series names are returned as c strings (0 terminated) """ - data = {k.rstrip('\x00'): v for k, v in data.items()} - self._on_data_cb(data) + try: + data = {k.rstrip('\x00'): v for k, v in data.items()} + self._on_data_cb(data) + except Exception as _: + pass diff --git a/listener.py b/listener.py index b4801b3..f106768 100644 --- a/listener.py +++ b/listener.py @@ -1,22 +1,26 @@ import asyncio import configparser import datetime +import os from lib.siridb.pipeserver import PipeServer +from lib.config import EnodoConfigParser from enodo.client import Client -from enodo.client.package import * +from enodo.protocol.package import * class Listener: def __init__(self, loop, config_path): self._loop = loop - self._config = configparser.ConfigParser() - self._config.read(config_path) + self._config = EnodoConfigParser() + if config_path is not None and os.path.exists(config_path): + self._config.read(config_path) self._series_to_watch = () self._serie_updates = {} self._client = Client(loop, self._config['enodo']['hub_hostname'], int(self._config['enodo']['hub_port']), - 'listener', self._config['enodo']['token'], heartbeat_interval=int(self._config['enodo']['heartbeat_interval'])) + 'listener', self._config['enodo']['internal_security_token'], + heartbeat_interval=int(self._config['enodo']['heartbeat_interval']), identity_file_path=".enodo_id") self._client_run_task = None self._updater_task = None self._last_update = datetime.datetime.now() @@ -42,7 +46,6 @@ async def _handle_pipe_data(self, data): print("INCOMMING DATA") for serie_name, values in data.items(): if serie_name in self._series_to_watch: - print("h2") if serie_name in self._serie_updates: self._serie_updates[serie_name].extend(values) else: diff --git a/main.py b/main.py index 6b88821..60803bd 100755 --- a/main.py +++ b/main.py @@ -1,10 +1,18 @@ import argparse import asyncio +import os from listener import Listener +from lib.config import create_standard_config_file + parser = argparse.ArgumentParser(description='Process config') -parser.add_argument('--config', help='Config path', required=True) +parser.add_argument('--config', help='Config path', required=False) +parser.add_argument('--create_config', help='Create standard config file', action='store_true', default=False) + +if parser.parse_args().create_config: + create_standard_config_file(os.path.join(os.path.dirname(os.path.realpath(__file__)), 'default.conf')) + exit() loop = asyncio.get_event_loop() listener = Listener(loop, parser.parse_args().config) diff --git a/readme.md b/readme.md index a120b6c..3b0b670 100644 --- a/readme.md +++ b/readme.md @@ -1,25 +1,7 @@ -# Listener - -## Deployment and communication - -message types: - - type | desc | -|:------------- |:-------------| -| HANDSHAKE | General type for sending handshake | -| UNKNOW_CLIENT | General type for sending server has received handshake from unknown client | -| SHUTDOWN | General type for sender is shutingdown | -| ADD_SERIE | General type for adding serie | -| REMOVE_SERIE | General type for removing serie | -| UPDATE_SERIES | Type for complete new list of series to watch | -| HANDSHAKE_OK | General type for successful received handshake | -| HANDSHAKE_FAIL | General type for a not succesful handshake | -| HEARTBEAT | General type for sending heartbeat | -| LISTENER_ADD_SERIE_COUNT | Type send by listener for serie datapoint count update | -| RESPONSE_OK | General ok response | -| TRAIN_MODEL | Type to execute training for certain model for certain serie | -| FIT_MODEL | Type to fit model for certain serie | -| FORECAST_SERIE | Type to calculate forecast for a certain serie | + +

Enodo

+ +# Enodo ### Listener @@ -27,15 +9,15 @@ The enode listener listens to pipe socket with siridb server. It sums up the tot It periodically sends an update to the enode hub. The listener only keeps track of series that are registered via an ADD_SERIE of the UPDATE_SERIE message. The listener is seperated from the enode hub, so that it can be placed close to the siridb server, so it can locally access the pipe socket. Every interval for heartbeat and update can be configured with the listener.conf file next to the main.py -### Worker - -The enode worker executes fitting and forecasting models/algorithms. The worker uses significant CPU and thus should be placed on a machine that has low CPU usage. -The worker can create different models (ARIMA/prophet) for series, train models with new data and calculate forecasts for a certain serie. - -### Hub -The enode hub communicateds and guides both the listener as the worker. The hub tells the listener to which series it needs to pay attention to, and it tells the worker which serie should be analysed. -Clients can connect to the hub for receiving updates, and polling for data. Also a client can use the hub to alter the data about which series should be watched. +## Getting started +To get the Enodo Listener setup you need to following the following steps: +### Locally +1. Install dependencies via `pip3 install -r requirements.txt` +2. Setup a .conf file file `python3 main.py --create_config` There will be made a `default.conf` next to the main.py. +3. Fill in the `default.conf` file +4. Call `python3 main.py --config=default.conf` to start the hub. +5. You can also setup the config by environment variables. These names are identical to those in the default.conf file, except all uppercase. \ No newline at end of file diff --git a/test/bytes.py b/test/bytes.py deleted file mode 100644 index 2a5ec87..0000000 --- a/test/bytes.py +++ /dev/null @@ -1,14 +0,0 @@ -from lib.socket.package import * - -# print((2).to_bytes(32, byteorder='big')) -# exit() - -header = b'' + (11).to_bytes(32, byteorder='big') + (1).to_bytes(8, byteorder='big') + (2).to_bytes(8, byteorder='big') - -print('hallo'.encode('utf-8')) - -print(header) - -size, type, id = read_header(header) - -print(size, type, id) diff --git a/test/senddata.py b/test/senddata.py deleted file mode 100644 index 9409769..0000000 --- a/test/senddata.py +++ /dev/null @@ -1,29 +0,0 @@ -import asyncio -import json -import time -import random -from siridb.connector import SiriDBClient - -async def example(siri): - # Start connecting to siridb. - # .connect() returns a list of all connections referring to the supplied - # hostlist. The list can contain exceptions in case a connection could not - # be made. - await siri.connect() - - try: - await siri.insert({'hub_test1': [[1560350480175, 1]]}) - finally: - # Close all siridb connections. - siri.close() - - -siri = SiriDBClient( - username='iris', - password='siri', - dbname='testdata_1', - hostlist=[('localhost', 9000)], # Multiple connections are supported - keepalive=True) - -loop = asyncio.get_event_loop() -loop.run_until_complete(example(siri)) \ No newline at end of file diff --git a/test/server.py b/test/server.py deleted file mode 100644 index 9f6af63..0000000 --- a/test/server.py +++ /dev/null @@ -1,77 +0,0 @@ -import asyncio -import json - -from lib.socket.package import * - -connected_listeners = [] -connected_workers = [] - -series = ('hub_test1', 'hub_test2') - - -async def handle_echo(reader, writer): - - connected = True - - while connected: - packet_type, packet_id, data = await read_packet(reader) - - addr = writer.get_extra_info('peername') - print("Received %r from %r" % (packet_id, addr)) - if packet_id == 0: - connected = False - - if packet_type == HANDSHAKE_LISTENER: - client_id = data.decode("utf-8") - connected_listeners.append(client_id) - print(f'New listener with id: {client_id}') - response = create_header(0, HANDSHAKE_OK, packet_id) - writer.write(response) - - update = json.dumps(series) - series_update = create_header(len(update), UPDATE_SERIES, packet_id) - writer.write(series_update + update.encode("utf-8")) - - if packet_type == HANDSHAKE_WORKER: - client_id = data.decode("utf-8") - connected_workers.append(client_id) - print(f'New worker with id: {client_id}') - response = create_header(0, HANDSHAKE_OK, packet_id) - writer.write(response) - - if packet_type == HEARTBEAT: - client_id = data.decode("utf-8") - print(f'Heartbeat from worker/listener with id: {client_id}') - response = create_header(0, HEARTBEAT, packet_id) - writer.write(response) - - if packet_type == LISTENER_ADD_SERIE_COUNT: - data = json.loads(data.decode("utf-8")) - print(f'Update from listener with id: {client_id}') - print(data) - response = create_header(0, REPONSE_OK, packet_id) - writer.write(response) - - await writer.drain() - - print("Close the client socket") - writer.close() - - - - -loop = asyncio.get_event_loop() -coro = asyncio.start_server(handle_echo, '127.0.0.1', 9103, loop=loop) -server = loop.run_until_complete(coro) - -# Serve requests until Ctrl+C is pressed -print('Serving on {}'.format(server.sockets[0].getsockname())) -try: - loop.run_forever() -except KeyboardInterrupt: - pass - -# Close the server -server.close() -loop.run_until_complete(server.wait_closed()) -loop.close() diff --git a/version.py b/version.py index 579fd47..d497b6e 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -VERSION = '0.0.1' \ No newline at end of file +VERSION = '0.1.0-beta2.0' \ No newline at end of file From 3800e9bc23491612a7decb0a59b7769b53947d3d Mon Sep 17 00:00:00 2001 From: timoj Date: Wed, 3 Mar 2021 13:28:11 +0100 Subject: [PATCH 2/2] Create CHANGELOG.md --- CHANGELOG.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 0000000..f8cc3a2 --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,24 @@ + +# Change Log +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](http://keepachangelog.com/) +and this project adheres to [Semantic Versioning](http://semver.org/). + +## [Unreleased] - yyyy-mm-dd + + +## [0.1.0-beta2.0] - 2021-03-04 + +Migration: config file changed, all siridb related config is in a seperate settings.enodo file in the root of the data folder you give up within the conf file. + +### Added + +- Environment variable support +- Default config can be created by running listener with `--create_config` + +### Changed + + +### Fixed + \ No newline at end of file