From 72bf39f5d84b0201e625a2af442ee7a9da091b09 Mon Sep 17 00:00:00 2001 From: timoj Date: Tue, 3 May 2022 21:08:24 +0200 Subject: [PATCH 1/6] removed password from get settings response --- CHANGELOG.md | 5 +++++ lib/api/apihandlers.py | 4 ++-- lib/config.py | 28 ++++++++++++++++++++++------ lib/webserver/basehandler.py | 2 +- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c7c9401..913a618 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [unreleased] - yyyy-mm-dd + +### Changed +- Removed siridb password from get settings base handler output + ## [0.1.0-beta3.2.3] - 2022-05-02 ### Added diff --git a/lib/api/apihandlers.py b/lib/api/apihandlers.py index 8ed6bef..987da30 100644 --- a/lib/api/apihandlers.py +++ b/lib/api/apihandlers.py @@ -244,7 +244,7 @@ async def get_siridb_enodo_status(cls, request): @classmethod async def get_enodo_readiness(cls, request): - """Get status of this analyser instance + """Get ready status of this hub instance Args: request (Request): aiohttp request @@ -259,7 +259,7 @@ async def get_enodo_readiness(cls, request): @classmethod async def get_enodo_liveness(cls, request): - """Get liveness status of this analyser instance + """Get liveness status of this hub instance Args: request (Request): aiohttp request diff --git a/lib/config.py b/lib/config.py index 61c919d..72e7a92 100644 --- a/lib/config.py +++ b/lib/config.py @@ -238,16 +238,16 @@ def setup_config_variables(cls): # Enodo cls.basic_auth_username = cls._config.get_r( - 'hub', 'basic_auth_username', required=False, default=None) + 'hub', 'basic_auth_username', required=False, default=None) cls.basic_auth_password = cls._config.get_r( - 'hub', 'basic_auth_password', required=False, default=None) + 'hub', 'basic_auth_password', required=False, default=None) cls.client_max_timeout = cls.to_int( cls._config.get_r('hub', 'client_max_timeout')) if cls.client_max_timeout < 35: # min value enforcement cls.client_max_timeout = 35 cls.socket_server_host = cls._config.get_r( - 'hub', 'internal_socket_server_hostname') + 'hub', 'internal_socket_server_hostname') cls.socket_server_port = cls.to_int( cls._config.get_r('hub', 'internal_socket_server_port')) cls.save_to_disk_interval = cls.to_int( @@ -265,7 +265,7 @@ def setup_config_variables(cls): default='false'), False) cls.base_dir = cls._config.get_r( - 'hub', 'base_path') + 'hub', 'base_path') cls.disable_safe_mode = cls.to_bool( cls._config.get_r('hub', 'disable_safe_mode'), False) cls.series_save_path = os.path.join( @@ -328,9 +328,25 @@ def to_bool(v, default): else: return default + @staticmethod + def _remove_dict_key_recursive(data, keys): + Config._remove_dict_key_recursive( + data[keys[0]], + keys[1:]) if len(keys) > 1 else data.pop( + keys[0], + None) + @classmethod - def get_settings(cls): - return cls._settings._sections + def get_settings(cls, include_secrets=True): + if not include_secrets: + secret_paths = [ + ["siridb", "password"], + ["siridb_output", "password"]] + data = cls._settings._sections + for secret in secret_paths: + cls._remove_dict_key_recursive(data, secret) + + return data @staticmethod def is_runtime_configurable(section, key): diff --git a/lib/webserver/basehandler.py b/lib/webserver/basehandler.py index 8fb04c9..4abec11 100644 --- a/lib/webserver/basehandler.py +++ b/lib/webserver/basehandler.py @@ -306,7 +306,7 @@ async def resp_get_enodo_hub_status(cls): @classmethod async def resp_get_enodo_config(cls): - return {'data': Config.get_settings()} + return {'data': Config.get_settings(include_secrets=False)} @classmethod async def resp_set_config(cls, data): From a5ce1cae31424807d7479462375839555cb103aa Mon Sep 17 00:00:00 2001 From: timoj Date: Tue, 3 May 2022 23:08:56 +0200 Subject: [PATCH 2/6] cleanup --- .gitignore | 1 + docs/analysing_flow.md | 15 ---- docs/websockets.md | 23 ------- lib/analyser/__init__.py | 0 lib/analyser/analyserwrapper.py | 20 ------ lib/api/__init__.py | 0 .../enodoeventmanager.py => eventmanager.py} | 0 lib/events/__init__.py | 2 - lib/{enodojobmanager.py => jobmanager.py} | 5 +- lib/{analyser/model.py => modulemanager.py} | 0 lib/series/series.py | 4 +- lib/series/seriesmanager.py | 7 +- lib/siridb/package.py | 23 ------- lib/siridb/pipeserver.py | 26 ------- lib/siridb/protocol.py | 37 ---------- lib/socket/clientmanager.py | 8 +-- lib/socket/package.py | 69 ------------------- lib/socket/socketserver.py | 4 +- lib/{api => webserver}/apihandlers.py | 0 lib/webserver/basehandler.py | 7 +- lib/webserver/routes.py | 2 +- server.py | 14 ++-- 22 files changed, 27 insertions(+), 240 deletions(-) delete mode 100644 docs/analysing_flow.md delete mode 100644 docs/websockets.md delete mode 100644 lib/analyser/__init__.py delete mode 100644 lib/analyser/analyserwrapper.py delete mode 100644 lib/api/__init__.py rename lib/{events/enodoeventmanager.py => eventmanager.py} (100%) delete mode 100644 lib/events/__init__.py rename lib/{enodojobmanager.py => jobmanager.py} (99%) rename lib/{analyser/model.py => modulemanager.py} (100%) delete mode 100644 lib/siridb/package.py delete mode 100644 lib/siridb/pipeserver.py delete mode 100644 lib/siridb/protocol.py delete mode 100644 lib/socket/package.py rename lib/{api => webserver}/apihandlers.py (100%) diff --git a/.gitignore b/.gitignore index 6b3abb7..807fcef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ test test/* build +dist .idea .idea/* .idea/vcs.xml diff --git a/docs/analysing_flow.md b/docs/analysing_flow.md deleted file mode 100644 index 5c31b32..0000000 --- a/docs/analysing_flow.md +++ /dev/null @@ -1,15 +0,0 @@ -

Enodo

- -# Analysing flow - -## Job types - -- Base analysis -- Forecast -- Anomaly detection - -## Job flow - -Possible flow: - -- [Base analysis] -> [Anomaly detection/Forecast] \ No newline at end of file diff --git a/docs/websockets.md b/docs/websockets.md deleted file mode 100644 index 4303643..0000000 --- a/docs/websockets.md +++ /dev/null @@ -1,23 +0,0 @@ -

Enodo

- -# Hub Websockets API - -## CRUD on resources - -| Resource | C | R | U | D | -| ------------- |:-:|:-:|:-:|:-:| -| Series | ✅ | ✅ | ✅ | ✅ | -| Series Details | ❌ | ✅ | ❌ | ❌ | -| Event Output | ✅ | ✅ | ✅ | ✅ | -| Modules | ❌ | ✅ | ❌ | ❌ | -| Status | ❌ | ✅ | ❌ | ❌ | -| Open Job| ❌ | ✅ | ❌ | ❌ | -| Active Job| ❌ | ✅ | ❌ | ❌ | -| Failed Job| ❌ | ✅ | ❌ | ❌ | - - -## Subscribe on resources -- Series (all and filtered) -- Open Jobs -- Modules -- Event Outputs \ No newline at end of file diff --git a/lib/analyser/__init__.py b/lib/analyser/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/analyser/analyserwrapper.py b/lib/analyser/analyserwrapper.py deleted file mode 100644 index cbf50ad..0000000 --- a/lib/analyser/analyserwrapper.py +++ /dev/null @@ -1,20 +0,0 @@ -GENERAL_PARAMETERS = { - 'forecast_points_in_future': 10, - 'use_data_since_timestamp': None, -} - - -async def setup_default_module_arguments(module_arguments): - """Setup default module params - - Args: - module_arguments (dict): module params - - Returns: - dict: params - """ - for key in GENERAL_PARAMETERS: - if key not in module_arguments: - module_arguments[key] = GENERAL_PARAMETERS[key] - - return module_arguments diff --git a/lib/api/__init__.py b/lib/api/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/events/enodoeventmanager.py b/lib/eventmanager.py similarity index 100% rename from lib/events/enodoeventmanager.py rename to lib/eventmanager.py diff --git a/lib/events/__init__.py b/lib/events/__init__.py deleted file mode 100644 index 1ed06fa..0000000 --- a/lib/events/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from .enodoeventmanager import EnodoEventManager -from .enodoeventmanager import EnodoEvent diff --git a/lib/enodojobmanager.py b/lib/jobmanager.py similarity index 99% rename from lib/enodojobmanager.py rename to lib/jobmanager.py index 4124400..f33157e 100644 --- a/lib/enodojobmanager.py +++ b/lib/jobmanager.py @@ -9,14 +9,15 @@ from enodo.model.config.series import SeriesJobConfigModel from enodo.protocol.packagedata import EnodoJobDataModel, \ EnodoJobRequestDataModel +from enodo.protocol.package import create_header, WORKER_JOB, \ + WORKER_JOB_CANCEL -from .events.enodoeventmanager import EnodoEvent, EnodoEventManager, \ +from .eventmanager import EnodoEvent, EnodoEventManager, \ ENODO_EVENT_JOB_QUEUE_TOO_LONG, ENODO_EVENT_STATIC_RULE_FAIL from .config import Config from .series.seriesmanager import SeriesManager from .serverstate import ServerState from .socket import ClientManager -from .socket.package import create_header, WORKER_JOB, WORKER_JOB_CANCEL from .socketio import SUBSCRIPTION_CHANGE_TYPE_UPDATE from lib.util import load_disk_data, save_disk_data from .socketio import SUBSCRIPTION_CHANGE_TYPE_DELETE, \ diff --git a/lib/analyser/model.py b/lib/modulemanager.py similarity index 100% rename from lib/analyser/model.py rename to lib/modulemanager.py diff --git a/lib/series/series.py b/lib/series/series.py index 6287779..498cb16 100644 --- a/lib/series/series.py +++ b/lib/series/series.py @@ -35,7 +35,7 @@ async def get_datapoints_counter_lock(self): def get_errors(self): # To stop circular import - from ..enodojobmanager import EnodoJobManager + from ..jobmanager import EnodoJobManager errors = [ job.error for job in EnodoJobManager.get_failed_jobs_for_series( @@ -44,7 +44,7 @@ def get_errors(self): def is_ignored(self): # To stop circular import - from ..enodojobmanager import EnodoJobManager + from ..jobmanager import EnodoJobManager return EnodoJobManager.has_series_failed_jobs(self.name) async def get_module(self, job_name): diff --git a/lib/series/seriesmanager.py b/lib/series/seriesmanager.py index ef09449..c6307b8 100644 --- a/lib/series/seriesmanager.py +++ b/lib/series/seriesmanager.py @@ -4,17 +4,16 @@ import re import qpack +from enodo.protocol.package import create_header, UPDATE_SERIES from lib.config import Config -from lib.events import EnodoEvent -from lib.events.enodoeventmanager import ENODO_EVENT_ANOMALY_DETECTED,\ - EnodoEventManager +from lib.eventmanager import ENODO_EVENT_ANOMALY_DETECTED,\ + EnodoEventManager, EnodoEvent from lib.serverstate import ServerState from lib.siridb.siridb import query_series_datapoint_count,\ drop_series, insert_points, query_series_data, does_series_exist,\ query_group_expression_by_name from lib.socket import ClientManager -from lib.socket.package import create_header, UPDATE_SERIES from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD,\ SUBSCRIPTION_CHANGE_TYPE_DELETE from lib.util import load_disk_data, save_disk_data diff --git a/lib/siridb/package.py b/lib/siridb/package.py deleted file mode 100644 index c0fe34b..0000000 --- a/lib/siridb/package.py +++ /dev/null @@ -1,23 +0,0 @@ -import struct -import qpack - - -class Package: - - __slots__ = ('pid', 'length', 'tipe', 'checkbit', 'data') - - struct_datapackage = struct.Struct(' 0: diff --git a/lib/socket/package.py b/lib/socket/package.py deleted file mode 100644 index 0b218d8..0000000 --- a/lib/socket/package.py +++ /dev/null @@ -1,69 +0,0 @@ -import socket - -# MESSAGE TYPES - -HANDSHAKE = 1 -HANDSHAKE_OK = 2 -HANDSHAKE_FAIL = 3 -UNKNOWN_CLIENT = 4 -HEARTBEAT = 5 -SHUTDOWN = 6 -CLIENT_SHUTDOWN = 7 - -ADD_SERIES = 8 -REMOVE_SERIES = 9 -LISTENER_ADD_SERIES = 10 -LISTENER_REMOVE_SERIES = 11 -LISTENER_NEW_SERIES_POINTS = 12 -UPDATE_SERIES = 13 - -REPONSE_OK = 14 - - -WORKER_JOB = 15 -WORKER_JOB_RESULT = 16 -WORKER_JOB_CANCEL = 21 -WORKER_JOB_CANCELLED = 22 -WORKER_UPDATE_BUSY = 23 -WORKER_REFUSED = 24 - - -''' -Header: -size, int, 32bit -type, int 8bit -packetid, int 8bit - -total header length = 48 bits -''' - -PACKET_HEADER_LEN = 6 - - -async def read_packet(sock, header_data=None): - if isinstance(sock, socket.socket): - if header_data is None: - header_data = sock.recv(PACKET_HEADER_LEN) - body_size, packet_type, packet_id = read_header(header_data) - return packet_type, packet_id, sock.recv(body_size) - else: - if header_data is None: - header_data = await sock.read(PACKET_HEADER_LEN) - body_size, packet_type, packet_id = read_header(header_data) - return packet_type, packet_id, await sock.read(body_size) - - -def create_header(size, type, id): - return size.to_bytes(4, byteorder='big') + \ - type.to_bytes(1, byteorder='big') + \ - id.to_bytes(1, byteorder='big') - - -def read_header(binary_data): - return int.from_bytes( - binary_data[: 4], - 'big'), int.from_bytes( - binary_data[4: 5], - 'big'), int.from_bytes( - binary_data[5: 6], - 'big') diff --git a/lib/socket/socketserver.py b/lib/socket/socketserver.py index 03401c1..757e7e3 100644 --- a/lib/socket/socketserver.py +++ b/lib/socket/socketserver.py @@ -4,11 +4,13 @@ from packaging import version import qpack +from enodo.protocol.package import create_header, read_packet, HEARTBEAT, \ + HANDSHAKE, HANDSHAKE_FAIL, UNKNOWN_CLIENT, CLIENT_SHUTDOWN, \ + HANDSHAKE_OK, UPDATE_SERIES from lib.series.seriesmanager import SeriesManager from lib.serverstate import ServerState from . import ClientManager -from .package import * ENODO_HUB_WORKER_MIN_VERSION = "0.1.0-beta3.0" diff --git a/lib/api/apihandlers.py b/lib/webserver/apihandlers.py similarity index 100% rename from lib/api/apihandlers.py rename to lib/webserver/apihandlers.py diff --git a/lib/webserver/basehandler.py b/lib/webserver/basehandler.py index 4abec11..06b95fd 100644 --- a/lib/webserver/basehandler.py +++ b/lib/webserver/basehandler.py @@ -1,5 +1,4 @@ from aiohttp import web -import logging from siridb.connector.lib.exceptions import QueryError, InsertError, \ ServerError, PoolError, AuthenticationError, UserAuthError @@ -8,14 +7,14 @@ from version import VERSION -from lib.analyser.model import EnodoModuleManager -from lib.events import EnodoEventManager +from lib.modulemanager import EnodoModuleManager +from lib.eventmanager import EnodoEventManager from lib.series.seriesmanager import SeriesManager from lib.serverstate import ServerState from lib.siridb.siridb import query_series_anomalies, query_series_forecasts, \ query_series_static_rules_hits from lib.util import regex_valid -from lib.enodojobmanager import EnodoJobManager, EnodoJob +from lib.jobmanager import EnodoJobManager, EnodoJob from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_UPDATE from lib.config import Config from lib.socket.clientmanager import ClientManager diff --git a/lib/webserver/routes.py b/lib/webserver/routes.py index 9d848a9..17c6325 100644 --- a/lib/webserver/routes.py +++ b/lib/webserver/routes.py @@ -1,4 +1,4 @@ -from lib.api.apihandlers import ApiHandlers +from lib.webserver.apihandlers import ApiHandlers def setup_routes(app, cors): diff --git a/server.py b/server.py index 8e0b05a..cfd5887 100644 --- a/server.py +++ b/server.py @@ -10,13 +10,16 @@ from aiohttp import web from aiojobs.aiohttp import setup from enodo.protocol.packagedata import * +from enodo.protocol.package import LISTENER_NEW_SERIES_POINTS, \ + WORKER_UPDATE_BUSY, WORKER_JOB_RESULT, WORKER_REFUSED, \ + WORKER_JOB_CANCELLED from enodo.jobs import JOB_STATUS_NONE, JOB_STATUS_DONE -from lib.analyser.model import EnodoModuleManager -from lib.api.apihandlers import ApiHandlers, auth +from lib.modulemanager import EnodoModuleManager +from lib.webserver.apihandlers import ApiHandlers, auth from lib.config import Config -from lib.events.enodoeventmanager import EnodoEventManager -from lib.enodojobmanager import EnodoJobManager +from lib.eventmanager import EnodoEventManager +from lib.jobmanager import EnodoJobManager from lib.logging import prepare_logger from lib.series.seriesmanager import SeriesManager @@ -24,9 +27,6 @@ from lib.socket import ClientManager from lib.socket.handler import receive_new_series_points, \ receive_worker_status_update, received_worker_refused -from lib.socket.package import LISTENER_NEW_SERIES_POINTS, \ - WORKER_UPDATE_BUSY, WORKER_JOB_RESULT, WORKER_REFUSED, \ - WORKER_JOB_CANCELLED from lib.socket.socketserver import SocketServer from lib.socketio.socketiohandlers import SocketIoHandler from lib.socketio.socketiorouter import SocketIoRouter From 2813b5df404756434a56c22485de73d2e40c9c9f Mon Sep 17 00:00:00 2001 From: timoj Date: Wed, 4 May 2022 09:50:07 +0200 Subject: [PATCH 3/6] refactored locks for managers --- lib/eventmanager.py | 24 +++------ lib/jobmanager.py | 113 +++++++++++++++++-------------------------- lib/series/series.py | 17 ++----- lib/util/__init__.py | 1 + lib/util/util.py | 14 ++++++ 5 files changed, 69 insertions(+), 100 deletions(-) diff --git a/lib/eventmanager.py b/lib/eventmanager.py index 3b07fcc..22a92c8 100644 --- a/lib/eventmanager.py +++ b/lib/eventmanager.py @@ -12,7 +12,7 @@ from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD, \ SUBSCRIPTION_CHANGE_TYPE_UPDATE, SUBSCRIPTION_CHANGE_TYPE_DELETE from lib.serverstate import ServerState -from lib.util import save_disk_data, load_disk_data +from lib.util import save_disk_data, load_disk_data, cls_lock ENODO_EVENT_ANOMALY_DETECTED = "event_anomaly_detected" ENODO_EVENT_JOB_QUEUE_TOO_LONG = "job_queue_too_long" @@ -206,7 +206,7 @@ class EnodoEventManager: outputs = None # Next id is always current. will be incremented when setting new id _next_output_id = None - _locked = False + _lock = None _max_output_id = None @classmethod @@ -214,24 +214,14 @@ async def async_setup(cls): cls.outputs = [] cls._next_output_id = 0 cls._max_output_id = 1000 + cls._lock = asyncio.Lock() @classmethod - async def _lock(cls): - while cls._locked is True: - await asyncio.sleep(0.1) - cls._locked = True - - @classmethod - async def _unlock(cls): - cls._locked = False - - @classmethod + @cls_lock() async def _get_next_output_id(cls): - await cls._lock() if cls._next_output_id + 1 >= cls._max_output_id: cls._next_output_id = 0 cls._next_output_id += 1 - await cls._unlock() return cls._next_output_id @classmethod @@ -261,20 +251,18 @@ async def remove_event_output(cls, output_id): return False @classmethod + @cls_lock() async def _remove_event_output(cls, output): - await cls._lock() cls.outputs.remove(output) await internal_updates_event_output_subscribers( SUBSCRIPTION_CHANGE_TYPE_DELETE, output.rid) - await cls._unlock() @classmethod + @cls_lock() async def _update_event_output(cls, output, data): - await cls._lock() output.update(data) await internal_updates_event_output_subscribers( SUBSCRIPTION_CHANGE_TYPE_UPDATE, output.to_dict()) - await cls._unlock() @classmethod async def handle_event(cls, event, series=None): diff --git a/lib/jobmanager.py b/lib/jobmanager.py index f33157e..4086b5d 100644 --- a/lib/jobmanager.py +++ b/lib/jobmanager.py @@ -19,7 +19,7 @@ from .serverstate import ServerState from .socket import ClientManager from .socketio import SUBSCRIPTION_CHANGE_TYPE_UPDATE -from lib.util import load_disk_data, save_disk_data +from lib.util import load_disk_data, save_disk_data, cls_lock from .socketio import SUBSCRIPTION_CHANGE_TYPE_DELETE, \ SUBSCRIPTION_CHANGE_TYPE_ADD @@ -66,7 +66,7 @@ class EnodoJobManager: _max_job_id = 1000 _max_job_timeout = 60 * 5 _next_job_id = None - _locked = False + _lock = None _max_in_queue_before_warning = None _update_queue_cb = None @@ -77,6 +77,7 @@ async def async_setup(cls, update_queue_cb): cls._update_queue_cb = update_queue_cb cls._max_in_queue_before_warning = Config.max_in_queue_before_warning + cls._lock = asyncio.Lock() @classmethod async def _build_index(cls): @@ -85,24 +86,13 @@ async def _build_index(cls): cls._active_jobs_index[job.rid] = job @classmethod + @cls_lock() async def _get_next_job_id(cls): - await cls._lock() if cls._next_job_id + 1 >= cls._max_job_id: cls._next_job_id = 0 cls._next_job_id += 1 - cls._unlock() return cls._next_job_id - @classmethod - async def _lock(cls): - while cls._locked is True: - await asyncio.sleep(0.1) - cls._locked = True - - @classmethod - def _unlock(cls): - cls._locked = False - @classmethod def get_active_jobs(cls): return cls._active_jobs @@ -133,9 +123,8 @@ async def clear_jobs(cls): for job in cls._active_jobs: jobs.append(job) for job in jobs: - await cls._lock() - cls._deactivate_job(job) - cls._unlock() + async with cls._lock: + cls._deactivate_job(job) await cls._send_worker_cancel_job(job.worker_id, job.rid) series = await SeriesManager.get_series(job.series_name) await series.set_job_status(job.job_config.config_name, @@ -192,9 +181,8 @@ def remove_failed_jobs_for_series(cls, series_name): cls._failed_jobs.remove(job) @classmethod + @cls_lock() async def activate_job(cls, job_id, worker_id): - await cls._lock() - j = None for job in cls._open_jobs: if job.rid == job_id: @@ -203,8 +191,6 @@ async def activate_job(cls, job_id, worker_id): if j is not None: await cls._activate_job(j, worker_id) - cls._unlock() - @classmethod async def _activate_job(cls, job, worker_id): if job is None or worker_id is None: @@ -229,9 +215,8 @@ async def get_activated_job(cls, job_id): return None @classmethod + @cls_lock() async def deactivate_job(cls, job_id): - await cls._lock() - j = None for job in cls._active_jobs: if job.rid == job_id: @@ -239,7 +224,6 @@ async def deactivate_job(cls, job_id): break cls._deactivate_job(j) - cls._unlock() @classmethod def _deactivate_job(cls, job): @@ -248,20 +232,17 @@ def _deactivate_job(cls, job): del cls._active_jobs_index[job.rid] @classmethod + @cls_lock() async def cancel_job(cls, job): - await cls._lock() - if job in cls._active_jobs: cls._active_jobs.remove(job) del cls._active_jobs_index[job.rid] cls._open_jobs.append(job) - cls._unlock() @classmethod + @cls_lock() async def cancel_jobs_for_series(cls, series_name): - await cls._lock() await cls._cancel_jobs_for_series(series_name) - cls._unlock() @classmethod async def _cancel_jobs_for_series(cls, series_name): @@ -289,16 +270,14 @@ async def _cancel_jobs_for_series(cls, series_name): SUBSCRIPTION_CHANGE_TYPE_DELETE, job.rid) @classmethod + @cls_lock() async def set_job_failed(cls, job_id, error): - await cls._lock() - j = None for job in cls._active_jobs: if job.rid == job_id: j = job break await cls._set_job_failed(j, error) - cls._unlock() @classmethod async def _set_job_failed(cls, job, error): @@ -311,15 +290,13 @@ async def _set_job_failed(cls, job, error): cls._failed_jobs.append(job) @classmethod + @cls_lock() async def clean_jobs(cls): - await cls._lock() - for job in cls._active_jobs: now = datetime.datetime.now() if (now - job.send_at).total_seconds() > cls._max_job_timeout: await cls._set_job_failed(job, "Job timed-out") await cls._send_worker_cancel_job(job.worker_id, job.rid) - cls._unlock() if len(cls._open_jobs) > cls._max_in_queue_before_warning: event = EnodoEvent( @@ -330,6 +307,35 @@ async def clean_jobs(cls): ENODO_EVENT_JOB_QUEUE_TOO_LONG) await EnodoEventManager.handle_event(event) + @classmethod + @cls_lock() + async def _try_activate_job(cls, next_job): + try: + series = await SeriesManager.get_series( + next_job.series_name) + if series is None: + return + + worker = await ClientManager.get_free_worker( + next_job.series_name, next_job.job_config.job_type, + await series.get_module( + next_job.job_config.config_name)) + if worker is None: + return + + logging.info( + f"Adding series: sending {next_job.series_name} to " + f"Worker for job type {next_job.job_config.job_type}") + await cls._send_worker_job_request(worker, next_job) + worker.is_going_busy = True + await cls._activate_job(next_job, worker.client_id) + except Exception as e: + logging.error( + "Something went wrong when trying to activate job") + logging.debug( + f"Corresponding error: {e}, " + f'exception class: {e.__class__.__name__}') + @classmethod async def check_for_jobs(cls): while ServerState.work_queue: @@ -341,34 +347,8 @@ async def check_for_jobs(cls): continue for next_job in cls._open_jobs: - try: - await cls._lock() - series = await SeriesManager.get_series( - next_job.series_name) - if series is None: - continue - - worker = await ClientManager.get_free_worker( - next_job.series_name, next_job.job_config.job_type, - await series.get_module( - next_job.job_config.config_name)) - if worker is None: - continue - - logging.info( - f"Adding series: sending {next_job.series_name} to " - f"Worker for job type {next_job.job_config.job_type}") - await cls._send_worker_job_request(worker, next_job) - worker.is_going_busy = True - await cls._activate_job(next_job, worker.client_id) - except Exception as e: - logging.error( - "Something went wrong when trying to activate job") - logging.debug( - f"Corresponding error: {e}, " - f'exception class: {e.__class__.__name__}') - finally: - cls._unlock() + await cls._try_activate_job(next_job) + await cls.clean_jobs() await asyncio.sleep(Config.watcher_interval) @@ -540,8 +520,8 @@ async def get_open_queue(cls): return [EnodoJob.to_dict(job) for job in cls._open_jobs] @classmethod + @cls_lock() async def save_to_disk(cls): - await cls._lock() try: job_data = { 'next_job_id': cls._next_job_id, @@ -554,12 +534,11 @@ async def save_to_disk(cls): f"Something went wrong when saving jobmanager data to disk") logging.debug(f"Corresponding error: {e}, " f'exception class: {e.__class__.__name__}') - cls._unlock() @classmethod + @cls_lock() async def load_from_disk(cls): loaded_failed_jobs = 0 - await cls._lock() try: if not os.path.exists(Config.jobs_save_path): raise Exception() @@ -580,5 +559,3 @@ async def load_from_disk(cls): logging.info( f'Loaded {loaded_failed_jobs} failed jobs from disk') - - cls._unlock() diff --git a/lib/series/series.py b/lib/series/series.py index 498cb16..df0b5fd 100644 --- a/lib/series/series.py +++ b/lib/series/series.py @@ -1,3 +1,4 @@ +import asyncio import time from enodo.jobs import JOB_TYPE_BASE_SERIES_ANALYSIS, JOB_STATUS_NONE, \ @@ -6,7 +7,6 @@ class Series: - # detecting_anomalies_status forecast_status series_analysed_status __slots__ = ('rid', 'name', 'series_config', 'state', '_datapoint_count_lock', 'series_characteristics') @@ -20,18 +20,7 @@ def __init__(self, name, config, state=None, self.state = SeriesState(**state) self.series_characteristics = series_characteristics - self._datapoint_count_lock = False - - async def set_datapoints_counter_lock(self, is_locked): - """ - Set lock so it can or can not be changed - :param is_locked: - :return: - """ - self._datapoint_count_lock = is_locked - - async def get_datapoints_counter_lock(self): - return self._datapoint_count_lock + self._datapoint_count_lock = asyncio.Lock() def get_errors(self): # To stop circular import @@ -83,7 +72,7 @@ async def add_to_datapoints_count(self, add_to_count): :param add_to_count: :return: """ - if self._datapoint_count_lock is False: + async with self._datapoint_count_lock: self.state.datapoint_count += add_to_count async def schedule_job(self, job_config_name, initial=False): diff --git a/lib/util/__init__.py b/lib/util/__init__.py index 968afd2..f57c483 100644 --- a/lib/util/__init__.py +++ b/lib/util/__init__.py @@ -3,3 +3,4 @@ from .util import regex_valid from .util import load_disk_data from .util import save_disk_data +from .util import cls_lock diff --git a/lib/util/util.py b/lib/util/util.py index 0e45244..81aa278 100644 --- a/lib/util/util.py +++ b/lib/util/util.py @@ -1,4 +1,5 @@ import datetime +import functools import json import logging import re @@ -65,3 +66,16 @@ def save_disk_data(path, data): f = open(path, "w+") f.write(json.dumps(save_data, default=safe_json_dumps)) f.close() + + +def cls_lock(): + def wrapper(func): + @functools.wraps(func) + async def wrapped(*args, **kwargs): + if len(args) > 0 and hasattr(args[0], "_lock"): + async with args[0]._lock: + return await func(*args, **kwargs) + else: + raise Exception("Incorrect usage of cls_lock func") + return wrapped + return wrapper From 9d10059ca456ffdf5469c0b02c8533bd3cad8bb7 Mon Sep 17 00:00:00 2001 From: timoj Date: Wed, 4 May 2022 09:58:50 +0200 Subject: [PATCH 4/6] upd changelog --- CHANGELOG.md | 2 ++ lib/series/series.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 913a618..5defc90 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - Removed siridb password from get settings base handler output +- Refactored manager locks +- Removed legacy code ## [0.1.0-beta3.2.3] - 2022-05-02 diff --git a/lib/series/series.py b/lib/series/series.py index df0b5fd..833bc9b 100644 --- a/lib/series/series.py +++ b/lib/series/series.py @@ -19,7 +19,6 @@ def __init__(self, name, config, state=None, state = {} self.state = SeriesState(**state) self.series_characteristics = series_characteristics - self._datapoint_count_lock = asyncio.Lock() def get_errors(self): From 87a2e7798e45aaaf7d33d2397aa234f47dd43160 Mon Sep 17 00:00:00 2001 From: timoj Date: Fri, 6 May 2022 09:30:21 +0200 Subject: [PATCH 5/6] moved settings to config file --- CHANGELOG.md | 3 +- lib/config.py | 89 ++++++++++++++++++++++----------------------------- 2 files changed, 41 insertions(+), 51 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5defc90..438ac07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,12 +5,13 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). -## [unreleased] - yyyy-mm-dd +## [0.1.0-beta3.2.4] - 2022-05-06 ### Changed - Removed siridb password from get settings base handler output - Refactored manager locks - Removed legacy code +- Moved siridb settings to config file, and `max_in_queue_before_warning` and `min_data_points` to the settings file ## [0.1.0-beta3.2.3] - 2022-05-02 diff --git a/lib/config.py b/lib/config.py index 72e7a92..790392f 100644 --- a/lib/config.py +++ b/lib/config.py @@ -20,18 +20,9 @@ 'enable_socket_io_api': 'false', 'disable_safe_mode': 'false' }, - 'events': { - 'max_in_queue_before_warning': '25' - }, 'analyser': { - 'min_data_points': '100', 'watcher_interval': '2', - 'siridb_connection_check_interval': '30', - 'interval_schedules_series': '3600', - } -} - -EMPTY_SETTINGS_FILE = { + }, 'siridb': { 'host': '', 'port': '', @@ -48,6 +39,15 @@ } } +EMPTY_SETTINGS_FILE = { + 'events': { + 'max_in_queue_before_warning': '25' + }, + 'analyser': { + 'min_data_points': '100' + } +} + class EnodoConfigParser(RawConfigParser): def __init__(self, env_support=True, **kwargs): @@ -82,9 +82,7 @@ class Config: _settings = None min_data_points = None watcher_interval = None - siridb_connection_check_interval = None db = None - interval_schedules_series = None # Siridb siridb_host = None @@ -227,14 +225,8 @@ def write_settings(cls): @classmethod def setup_config_variables(cls): - cls.min_data_points = cls.to_int( - cls._config.get_r('analyser', 'min_data_points')) cls.watcher_interval = cls.to_int( cls._config.get_r('analyser', 'watcher_interval')) - cls.siridb_connection_check_interval = cls.to_int( - cls._config.get_r('analyser', 'siridb_connection_check_interval')) - cls.interval_schedules_series = cls.to_int( - cls._config.get_r('analyser', 'interval_schedules_series')) # Enodo cls.basic_auth_username = cls._config.get_r( @@ -276,38 +268,45 @@ def setup_config_variables(cls): cls.base_dir, 'data/outputs.json') cls.module_save_path = os.path.join( cls.base_dir, 'data/modules.json') - cls.max_in_queue_before_warning = cls.to_int( - cls._config.get_r('events', 'max_in_queue_before_warning')) - if not os.path.exists(os.path.join(cls.base_dir, 'data')): - os.makedirs(os.path.join(cls.base_dir, 'data')) - - @classmethod - def setup_settings_variables(cls): # SiriDB - cls.siridb_host = cls._settings.get_r('siridb', 'host') + cls.siridb_host = cls._config.get_r('siridb', 'host') cls.siridb_port = cls.to_int( - cls._settings.get_r('siridb', 'port')) - cls.siridb_user = cls._settings.get_r('siridb', 'user') - cls.siridb_password = cls._settings.get_r('siridb', 'password') - cls.siridb_database = cls._settings.get_r('siridb', 'database') + cls._config.get_r('siridb', 'port')) + cls.siridb_user = cls._config.get_r('siridb', 'user') + cls.siridb_password = cls._config.get_r('siridb', 'password') + cls.siridb_database = cls._config.get_r('siridb', 'database') # SiriDB Forecast - cls.siridb_output_host = cls._settings.get_r( + cls.siridb_output_host = cls._config.get_r( 'siridb_output', 'host') cls.siridb_output_port = cls.to_int( - cls._settings.get_r('siridb_output', 'port')) - cls.siridb_output_user = cls._settings.get_r( + cls._config.get_r('siridb_output', 'port')) + cls.siridb_output_user = cls._config.get_r( 'siridb_output', 'user') - cls.siridb_output_password = cls._settings.get_r( + cls.siridb_output_password = cls._config.get_r( 'siridb_output', 'password') - cls.siridb_output_database = cls._settings.get_r( + cls.siridb_output_database = cls._config.get_r( 'siridb_output', 'database') + if not os.path.exists(os.path.join(cls.base_dir, 'data')): + os.makedirs(os.path.join(cls.base_dir, 'data')) + + @classmethod + def setup_settings_variables(cls): + # TODO set default in one place/overview + cls.max_in_queue_before_warning = cls.to_int(cls._settings.get_r( + 'events', 'max_in_queue_before_warning', + required=False, default=25)) + cls.min_data_points = cls.to_int( + cls._settings.get_r( + 'analyser', 'min_data_points', required=False, + default=100)) + @staticmethod def to_int(val): return_val = None @@ -339,9 +338,7 @@ def _remove_dict_key_recursive(data, keys): @classmethod def get_settings(cls, include_secrets=True): if not include_secrets: - secret_paths = [ - ["siridb", "password"], - ["siridb_output", "password"]] + secret_paths = [] data = cls._settings._sections for secret in secret_paths: cls._remove_dict_key_recursive(data, secret) @@ -351,19 +348,11 @@ def get_settings(cls, include_secrets=True): @staticmethod def is_runtime_configurable(section, key): _is_runtime_configurable = { - "siridb": [ - "host", - "port", - "user", - "password", - "database" + "events": [ + "max_in_queue_before_warning" ], - "siridb_output": [ - "host", - "port", - "user", - "password", - "database" + "analyser": [ + "min_data_points" ] } From 9044a642c8e6de51aed7f3aa6b04146271f85d6f Mon Sep 17 00:00:00 2001 From: timoj Date: Fri, 6 May 2022 09:32:56 +0200 Subject: [PATCH 6/6] Update version.py --- version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.py b/version.py index 4bdad0e..8eb7d58 100644 --- a/version.py +++ b/version.py @@ -1 +1 @@ -VERSION = '0.1.0-beta3.2.3' +VERSION = '0.1.0-beta3.2.4'