From 0e8926edf040762782d349e6d3ed1799bb25efed Mon Sep 17 00:00:00 2001 From: timoj Date: Mon, 2 May 2022 15:36:06 +0200 Subject: [PATCH 1/3] bugfixes --- CHANGELOG.md | 10 +++++- lib/api/apihandlers.py | 7 ++-- lib/config.py | 3 ++ lib/enodojobmanager.py | 4 ++- lib/serverstate.py | 58 ++++++++++++++++++++++---------- lib/siridb/siridb.py | 15 ++++++++- lib/socket/clientmanager.py | 6 ++++ lib/socketio/socketiohandlers.py | 2 +- lib/webserver/basehandler.py | 15 ++++++--- version.py | 2 +- 10 files changed, 91 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b3de871..c7c9401 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## [Unreleased] - yyyy-mm-dd +## [0.1.0-beta3.2.3] - 2022-05-02 + +### Added +- Implemented use_max_points property in job configs + +### Fixed +- Bug with worker is_going_busy status on reconnect +- Fixed return http status on not found when fetching series details +- Fixed issue with siridb connect when changing settings ## [0.1.0-beta3.2.2] - 2022-04-29 diff --git a/lib/api/apihandlers.py b/lib/api/apihandlers.py index 7d788ba..8ed6bef 100644 --- a/lib/api/apihandlers.py +++ b/lib/api/apihandlers.py @@ -56,10 +56,9 @@ async def get_single_monitored_series(cls, request): _type_: _description_ """ series_name = unquote(request.match_info['series_name']) - - return web.json_response( - data=await BaseHandler.resp_get_single_monitored_series( - series_name), dumps=safe_json_dumps) + data, status = await BaseHandler.resp_get_single_monitored_series( + series_name) + return web.json_response(data, dumps=safe_json_dumps, status=status) @classmethod @EnodoAuth.auth.required diff --git a/lib/config.py b/lib/config.py index cdf9b5e..61c919d 100644 --- a/lib/config.py +++ b/lib/config.py @@ -214,7 +214,10 @@ def get_siridb_settings(cls): @classmethod def update_settings(cls, section, key, value): + if cls._settings[section][key] == value: + return True cls._settings[section][key] = value + return False @classmethod def write_settings(cls): diff --git a/lib/enodojobmanager.py b/lib/enodojobmanager.py index a0c9e66..4124400 100644 --- a/lib/enodojobmanager.py +++ b/lib/enodojobmanager.py @@ -432,6 +432,7 @@ async def receive_job_result(cls, writer, packet_type, series.series_characteristics = \ job_response.get('characteristics') series.state.health = job_response.get('health') + series.state.interval = job_response.get('interval') await series.set_job_status( job.job_config.config_name, JOB_STATUS_DONE) await series.schedule_job(job.job_config.config_name) @@ -481,7 +482,8 @@ async def _send_worker_job_request(cls, worker, job): job_data = EnodoJobRequestDataModel( job_id=job.rid, job_config=job.job_config, series_name=job.series_name, - series_config=series.series_config) + series_config=series.series_config, + series_state=series.state) data = qpack.packb(job_data.serialize()) header = create_header(len(data), WORKER_JOB, 0) worker.writer.write(header + data) diff --git a/lib/serverstate.py b/lib/serverstate.py index b198f51..005056b 100644 --- a/lib/serverstate.py +++ b/lib/serverstate.py @@ -1,6 +1,10 @@ +from asyncio import Lock +import logging + from aiojobs import create_scheduler from siridb.connector import SiriDBClient from lib.config import Config +from lib.siridb.siridb import query_time_unit from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_INITIAL @@ -10,19 +14,26 @@ class ServerState: sio = None siridb_data_client = None siridb_output_client = None + siridb_data_client_lock = None + siridb_output_client_lock = None tasks_last_runs = {} siridb_conn_status = {} + siridb_ts_unit = None readiness = None scheduler = None + @classmethod async def async_setup(cls, sio): cls.running = True cls.work_queue = True cls.readiness = False cls.sio = sio + cls.siridb_data_client_lock = Lock() + cls.siridb_output_client_lock = Lock() - await cls.setup_siridb_connection() + await cls.setup_siridb_data_connection() + await cls.setup_siridb_output_connection() cls.tasks_last_runs = { 'watch_series': None, @@ -58,27 +69,36 @@ def _siridb_config_equal(cls, a, b): return True @classmethod - async def setup_siridb_connection(cls): + async def setup_siridb_data_connection(cls): data_config, output_config = Config.get_siridb_settings() - if cls.siridb_data_client is not None: - cls.stop() - - cls.siridb_data_client = SiriDBClient( - **data_config, - keepalive=True) - await cls.siridb_data_client.connect() - if not cls._siridb_config_equal( - data_config, output_config): + logging.info('Setting up SiriDB data Connection') + async with cls.siridb_data_client_lock: + if cls.siridb_data_client is not None: + cls.siridb_data_client.close() + + cls.siridb_data_client = SiriDBClient( + **data_config, + keepalive=True) + await cls.siridb_data_client.connect() + + await cls.refresh_siridb_status() + + @classmethod + async def setup_siridb_output_connection(cls): + data_config, output_config = Config.get_siridb_settings() + + logging.info('Setting up SiriDB output Connection') + async with cls.siridb_output_client_lock: if cls.siridb_output_client is not None: cls.siridb_output_client.close() - cls.siridb_output_client = SiriDBClient( - **output_config, - keepalive=True) - await cls.siridb_output_client.connect() - elif cls.siridb_output_client is not None: - cls.siridb_output_client.close() - cls.siridb_output_client = None + cls.siridb_output_client = None + + if not cls._siridb_config_equal(data_config, output_config): + cls.siridb_output_client = SiriDBClient( + **output_config, + keepalive=True) + await cls.siridb_output_client.connect() await cls.refresh_siridb_status() @@ -106,6 +126,8 @@ def get_siridb_output_conn_status(cls): async def refresh_siridb_status(cls): status = {} status['data_conn'] = cls.get_siridb_data_conn_status() + if status['data_conn']: + cls.siridb_ts_unit = await query_time_unit(cls.siridb_data_client) status['analysis_conn'] = cls.get_siridb_output_conn_status() if status != cls.siridb_conn_status: diff --git a/lib/siridb/siridb.py b/lib/siridb/siridb.py index 6c6bf7e..2a5ae51 100644 --- a/lib/siridb/siridb.py +++ b/lib/siridb/siridb.py @@ -34,7 +34,20 @@ async def does_series_exist(siridb_client, series_name): return exists -# @classmethod +async def query_time_unit(siridb_client): + result = None + try: + result = await siridb_client.query( + f'show time_precision') + except (QueryError, InsertError, ServerError, PoolError, + AuthenticationError, UserAuthError) as e: + print("Connection problem with SiriDB server") + pass + if result is None: + return None + return result["data"][0]["value"] + + async def query_series_data(siridb_client, series_name, selector="*"): result = None try: diff --git a/lib/socket/clientmanager.py b/lib/socket/clientmanager.py index 717d425..5e3a320 100644 --- a/lib/socket/clientmanager.py +++ b/lib/socket/clientmanager.py @@ -85,6 +85,11 @@ def set_config(self, worker_config): def get_config(self): return self.worker_config + async def reconnected(self, ip_address, writer): + await super().reconnected(ip_address, writer) + self.busy = False + self.is_going_busy = False + def to_dict(self): base_dict = super().to_dict() extra_dict = { @@ -220,6 +225,7 @@ async def get_free_worker(cls, series_name, job_type, module_name): job_type, module_name): return worker + for worker_id in cls.workers: worker = cls.workers.get(worker_id) if worker.worker_config.mode == WORKER_MODE_GLOBAL and \ diff --git a/lib/socketio/socketiohandlers.py b/lib/socketio/socketiohandlers.py index a91816d..b42b94a 100644 --- a/lib/socketio/socketiohandlers.py +++ b/lib/socketio/socketiohandlers.py @@ -64,7 +64,7 @@ async def _get_all_series(cls, sid, regex_filter, event): @socketio_auth_required async def get_series_details(cls, sid, data, event): series_name = data.get('series_name') - resp = await BaseHandler.resp_get_single_monitored_series( + resp, status = await BaseHandler.resp_get_single_monitored_series( series_name) return safe_json_dumps(resp) diff --git a/lib/webserver/basehandler.py b/lib/webserver/basehandler.py index 6d07897..56fda37 100644 --- a/lib/webserver/basehandler.py +++ b/lib/webserver/basehandler.py @@ -1,4 +1,5 @@ from aiohttp import web +import logging from siridb.connector.lib.exceptions import QueryError, InsertError, \ ServerError, PoolError, AuthenticationError, UserAuthError @@ -49,9 +50,9 @@ async def resp_get_single_monitored_series(cls, series_name): """ series = await SeriesManager.get_series(series_name) if series is None: - return web.json_response(data={'data': ''}, status=404) + return {'data': ''}, 404 series_data = series.to_dict() - return {'data': series_data} + return {'data': series_data}, 200 @classmethod async def resp_get_series_forecasts(cls, series_name): @@ -319,13 +320,19 @@ async def resp_set_config(cls, data): """ section = data.get('section') keys_and_values = data.get('entries') + changed = False for key in keys_and_values: if Config.is_runtime_configurable(section, key): - Config.update_settings( + changed = changed or Config.update_settings( section, key, keys_and_values[key]) Config.write_settings() Config.setup_settings_variables() - await ServerState.setup_siridb_connection() + + if changed and section == "siridb": + await ServerState.setup_siridb_data_connection() + elif changed and section == "siridb_output": + await ServerState.setup_siridb_output_connection() + return {'data': True} @classmethod diff --git a/version.py b/version.py index 5855c88..4bdad0e 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -VERSION = '0.1.0-beta3.2.1' +VERSION = '0.1.0-beta3.2.3' From b4235601c1a294caa9da40f4da748b6a8b699b10 Mon Sep 17 00:00:00 2001 From: timoj Date: Mon, 2 May 2022 15:40:09 +0200 Subject: [PATCH 2/3] linter --- lib/serverstate.py | 3 +-- lib/socket/clientmanager.py | 1 - lib/webserver/basehandler.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/lib/serverstate.py b/lib/serverstate.py index 005056b..ee57fbb 100644 --- a/lib/serverstate.py +++ b/lib/serverstate.py @@ -22,7 +22,6 @@ class ServerState: readiness = None scheduler = None - @classmethod async def async_setup(cls, sio): cls.running = True @@ -93,7 +92,7 @@ async def setup_siridb_output_connection(cls): if cls.siridb_output_client is not None: cls.siridb_output_client.close() cls.siridb_output_client = None - + if not cls._siridb_config_equal(data_config, output_config): cls.siridb_output_client = SiriDBClient( **output_config, diff --git a/lib/socket/clientmanager.py b/lib/socket/clientmanager.py index 5e3a320..b949fa8 100644 --- a/lib/socket/clientmanager.py +++ b/lib/socket/clientmanager.py @@ -225,7 +225,6 @@ async def get_free_worker(cls, series_name, job_type, module_name): job_type, module_name): return worker - for worker_id in cls.workers: worker = cls.workers.get(worker_id) if worker.worker_config.mode == WORKER_MODE_GLOBAL and \ diff --git a/lib/webserver/basehandler.py b/lib/webserver/basehandler.py index 56fda37..8fb04c9 100644 --- a/lib/webserver/basehandler.py +++ b/lib/webserver/basehandler.py @@ -332,7 +332,7 @@ async def resp_set_config(cls, data): await ServerState.setup_siridb_data_connection() elif changed and section == "siridb_output": await ServerState.setup_siridb_output_connection() - + return {'data': True} @classmethod From 2a44b4bbac4255d42d2ce2819bb5ed7506853898 Mon Sep 17 00:00:00 2001 From: timoj Date: Mon, 2 May 2022 15:45:41 +0200 Subject: [PATCH 3/3] Update requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a9fb293..69e5f55 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,7 +4,7 @@ aiohttp_basicauth==1.0.0 aiohttp_cors==0.7.0 jinja2==3.1.0 psutil==5.9.0 -python-enodo==0.2.14 +python-enodo==0.2.15 python-socketio==5.5.2 qpack==0.0.19 siridb-connector==2.0.8