Skip to content

Commit f12afd0

Browse files
authored
Merge pull request #14 from SiriDB/development
Development
2 parents 41df298 + f649e5b commit f12afd0

20 files changed

+255
-128
lines changed

CHANGELOG.md

+16
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,23 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
77

88
## [Unreleased] - yyyy-mm-dd
99

10+
## [0.1.0-beta2.1] - 2021-03-17
11+
12+
Migration: series config's are changed completely. Removing series.json from the data folder is necessary. You will need to re-add the series after this.
13+
14+
### Added
1015

16+
- Support for series management page in the GUI
17+
18+
### Changed
19+
20+
- Series config is changed. Mainly the job config is setup per job type. This includes `model_params`
21+
- Package size is change, removed unnecessary bytes
22+
23+
### Fixed
24+
25+
- Bug with job management
26+
- Model fetching
1127

1228
## [0.1.0-beta2.0] - 2021-03-04
1329

lib/analyser/model.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import os
33

44
from enodo import EnodoModel
5-
from lib.config.config import Config
5+
from lib.config import Config
66
from lib.util import safe_json_dumps
77
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD, SUBSCRIPTION_CHANGE_TYPE_DELETE
88

lib/api/apihandlers.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
from aiohttp_basicauth import BasicAuthMiddleware
66

7-
from lib.config.config import Config
7+
from lib.config import Config
88
from lib.socket import ClientManager
99
from lib.util import safe_json_dumps
1010
from lib.webserver.auth import EnodoAuth

lib/config/config.py lib/config.py

+14
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,20 @@ def read_config(cls, path):
189189
cls._settings.read(settings_path)
190190
cls.setup_settings_variables()
191191

192+
@classmethod
193+
def get_siridb_settings(cls):
194+
return {
195+
"username": cls.siridb_user,
196+
"password": cls.siridb_password,
197+
"dbname": cls.siridb_database,
198+
"hostlist": [(cls.siridb_host, cls.siridb_port)]
199+
}, {
200+
"username": cls.siridb_forecast_user,
201+
"password": cls.siridb_forecast_password,
202+
"dbname": cls.siridb_forecast_database,
203+
"hostlist": [(cls.siridb_forecast_host, cls.siridb_forecast_port)]
204+
}
205+
192206
@classmethod
193207
def update_settings(cls, section, key, value):
194208
cls._settings[section][key] = value

lib/config/__init__.py

Whitespace-only changes.

lib/enodojobmanager.py

+13-8
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import logging
55
import os
6+
import datetime
67

78
import qpack
89
from enodo.jobs import *
@@ -11,7 +12,7 @@
1112
WORKER_MODE_DEDICATED_SERIES
1213

1314
from .events.enodoeventmanager import EnodoEvent, EnodoEventManager, ENODO_EVENT_JOB_QUEUE_TOO_LONG, ENODO_EVENT_STATIC_RULE_FAIL
14-
from .config.config import Config
15+
from .config import Config
1516
from .series.seriesmanager import SeriesManager
1617
from .series import SERIES_ANALYSED_STATUS_DONE
1718
from .serverstate import ServerState
@@ -42,7 +43,10 @@ def __init__(self, rid, job_type, series_name, job_data=None, send_at=None, erro
4243
def to_dict(cls, job):
4344
resp = {}
4445
for slot in cls.__slots__:
45-
resp[slot] = getattr(job, slot)
46+
if isinstance(getattr(job, slot), datetime.datetime):
47+
resp[slot] = int(getattr(job, slot).timestamp())
48+
else:
49+
resp[slot] = getattr(job, slot)
4650
return resp
4751

4852
@classmethod
@@ -268,7 +272,8 @@ async def _set_job_failed(cls, job, error):
268272
if job is not None:
269273
job.error = error
270274
await cls._cancel_jobs_for_series(job.series_name)
271-
cls._active_jobs.remove(job)
275+
if job in cls._active_jobs:
276+
cls._active_jobs.remove(job)
272277
del cls._active_jobs_index[job.rid]
273278
cls._failed_jobs.append(job)
274279

@@ -420,7 +425,7 @@ async def save_to_disk(cls):
420425
try:
421426
job_data = {
422427
'next_job_id': cls._next_job_id,
423-
'open_jobs': [EnodoJob.to_dict(job) for job in cls._open_jobs],
428+
# 'open_jobs': [EnodoJob.to_dict(job) for job in cls._open_jobs],
424429
'failed_jobs': [EnodoJob.to_dict(job) for job in cls._failed_jobs],
425430
}
426431
f = open(Config.jobs_save_path, "w")
@@ -449,16 +454,16 @@ async def load_from_disk(cls):
449454
if isinstance(data, dict):
450455
if 'next_job_id' in data:
451456
cls._next_job_id = int(data.get('next_job_id'))
452-
if 'open_jobs' in data:
453-
loaded_open_jobs += len(data.get('open_jobs'))
454-
cls._open_jobs = [EnodoJob.from_dict(job_data) for job_data in data.get('open_jobs')]
457+
# if 'open_jobs' in data:
458+
# loaded_open_jobs += len(data.get('open_jobs'))
459+
# cls._open_jobs = [EnodoJob.from_dict(job_data) for job_data in data.get('open_jobs')]
455460
if 'failed_jobs' in data:
456461
loaded_failed_jobs += len(data.get('failed_jobs'))
457462
cls._failed_jobs = [EnodoJob.from_dict(job_data) for job_data in data.get('failed_jobs')]
458463

459464
await cls._build_index()
460465

461466
logging.info(
462-
f'Loaded {loaded_open_jobs} open jobs and {loaded_failed_jobs} failed jobs from disk')
467+
f'Loaded {loaded_failed_jobs} failed jobs from disk')
463468

464469
cls._unlock()

lib/events/enodoeventmanager.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import json
99
from jinja2 import Environment, PackageLoader
1010

11-
from lib.config.config import Config
11+
from lib.config import Config
1212
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_ADD, SUBSCRIPTION_CHANGE_TYPE_UPDATE, SUBSCRIPTION_CHANGE_TYPE_DELETE
1313
from lib.serverstate import ServerState
1414

lib/series/series.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def is_ignored(self):
4747
return EnodoJobManager.has_series_failed_jobs(self.name)
4848

4949
async def get_model(self, job_type):
50-
return self.series_config.get_model_for_job(job_type)
50+
return self.series_config.get_config_for_job(job_type).model
5151

5252
async def get_job_status(self, job_type):
5353
status = self.series_job_statuses.get(job_type)
@@ -58,6 +58,12 @@ async def get_job_status(self, job_type):
5858
async def set_job_status(self, job_type, status):
5959
self.series_job_statuses[job_type] = status
6060

61+
def job_activated(self, job_type):
62+
job_config = self.series_config.job_config.get(job_type)
63+
if job_config is None or not job_config.activated:
64+
return False
65+
return True
66+
6167
async def get_datapoints_count(self):
6268
return self._datapoint_count
6369

@@ -71,13 +77,13 @@ async def add_to_datapoints_count(self, add_to_count):
7177
self._datapoint_count += add_to_count
7278

7379
async def schedule_job(self, job_type):
74-
if job_type in self.series_config.job_models:
75-
if job_type in self._job_schedule and job_type in self.series_config.job_schedule:
80+
if job_type in self.series_config.job_config:
81+
if job_type in self._job_schedule:
7682
if self._job_schedule[job_type] <= self._datapoint_count:
77-
self._job_schedule[job_type] = self._datapoint_count + self.series_config.job_schedule[job_type]
83+
self._job_schedule[job_type] = self._datapoint_count + self.series_config.job_config[job_type].job_schedule
7884

7985
async def is_job_due(self, job_type):
80-
if job_type not in self.series_config.job_models:
86+
if job_type not in self.series_config.job_config:
8187
return False
8288

8389
if self.series_job_statuses.get(job_type) != JOB_STATUS_DONE:

lib/series/seriesmanager.py

+10-10
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import qpack
88

9-
from lib.config.config import Config
9+
from lib.config import Config
1010
from lib.events import EnodoEvent
1111
from lib.events.enodoeventmanager import ENODO_EVENT_ANOMALY_DETECTED, EnodoEventManager
1212
from lib.serverstate import ServerState
@@ -40,8 +40,8 @@ async def series_changed(cls, change_type, series_name):
4040
@classmethod
4141
async def add_series(cls, series):
4242
if series.get('name') not in cls._series:
43-
if await does_series_exist(ServerState.siridb_data_client, series.get('name')):
44-
collected_datapoints = await query_series_datapoint_count(ServerState.siridb_data_client, series.get('name'))
43+
if await does_series_exist(ServerState.get_siridb_data_conn(), series.get('name')):
44+
collected_datapoints = await query_series_datapoint_count(ServerState.get_siridb_data_conn(), series.get('name'))
4545
if collected_datapoints:
4646
series['datapoint_count'] = collected_datapoints
4747
cls._series[series.get('name')] = Series.from_dict(series)
@@ -67,7 +67,7 @@ def get_listener_info(cls):
6767
data = {}
6868
for series_name in cls._series:
6969
data[series_name] = {
70-
"realtime": JOB_TYPE_DETECT_ANOMALIES_FOR_SERIES_REALTIME in cls._series[series_name].series_config.job_models
70+
"realtime": JOB_TYPE_DETECT_ANOMALIES_FOR_SERIES_REALTIME in cls._series[series_name].series_config.job_config
7171
}
7272
return data
7373

@@ -106,8 +106,8 @@ async def add_to_datapoint_counter(cls, series_name, value):
106106
async def add_forecast_to_series(cls, series_name, points):
107107
series = cls._series.get(series_name, None)
108108
if series is not None:
109-
await drop_series(ServerState.siridb_forecast_client, f'forecast_{series_name}')
110-
await insert_points(ServerState.siridb_forecast_client, f'forecast_{series_name}', points)
109+
await drop_series(ServerState.get_siridb_forecast_conn(), f'forecast_{series_name}')
110+
await insert_points(ServerState.get_siridb_forecast_conn(), f'forecast_{series_name}', points)
111111
await series.set_job_status(JOB_TYPE_FORECAST_SERIES, JOB_STATUS_DONE)
112112

113113
# date_1 = datetime.datetime.now()
@@ -122,20 +122,20 @@ async def add_anomalies_to_series(cls, series_name, points):
122122
event = EnodoEvent('Anomaly detected!', f'{len(points)} anomalies detected for series {series_name}',
123123
ENODO_EVENT_ANOMALY_DETECTED)
124124
await EnodoEventManager.handle_event(event)
125-
await drop_series(ServerState.siridb_forecast_client, f'anomalies_{series_name}')
126-
await insert_points(ServerState.siridb_forecast_client, f'anomalies_{series_name}', points)
125+
await drop_series(ServerState.get_siridb_forecast_conn(), f'anomalies_{series_name}')
126+
await insert_points(ServerState.get_siridb_forecast_conn(), f'anomalies_{series_name}', points)
127127
await series.set_job_status(JOB_TYPE_DETECT_ANOMALIES_FOR_SERIES, JOB_STATUS_DONE)
128128

129129
@classmethod
130130
async def get_series_forecast(cls, series_name):
131-
values = await query_series_data(ServerState.siridb_forecast_client, f'forecast_{series_name}')
131+
values = await query_series_data(ServerState.get_siridb_forecast_conn(), f'forecast_{series_name}')
132132
if values is not None:
133133
return values.get(f'forecast_{series_name}', None)
134134
return None
135135

136136
@classmethod
137137
async def get_series_anomalies(cls, series_name):
138-
values = await query_series_data(ServerState.siridb_forecast_client, f'anomalies_{series_name}')
138+
values = await query_series_data(ServerState.get_siridb_forecast_conn(), f'anomalies_{series_name}')
139139
if values is not None:
140140
return values.get(f'anomalies_{series_name}', None)
141141
return None

lib/serverstate.py

+89-26
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
from siridb.connector import SiriDBClient
2+
from lib.config import Config
3+
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_INITIAL
24

35

46
class ServerState:
@@ -7,43 +9,104 @@ class ServerState:
79
siridb_data_client = None
810
siridb_forecast_client = None
911
tasks_last_runs = {}
12+
siridb_conn_status = {}
1013

1114
@classmethod
12-
async def async_setup(cls, sio, siridb_data_username,
13-
siridb_data_password,
14-
siridb_data_dbname,
15-
siridb_data_hostlist,
16-
siridb_forecast_username,
17-
siridb_forecast_password,
18-
siridb_forecast_dbname,
19-
siridb_forecast_hostlist):
15+
async def async_setup(cls, sio):
2016
cls.running = True
2117
cls.sio = sio
22-
23-
cls.siridb_data_client = SiriDBClient(
24-
username=siridb_data_username,
25-
password=siridb_data_password,
26-
dbname=siridb_data_dbname,
27-
hostlist=siridb_data_hostlist, # Multiple connections are supported
28-
keepalive=True)
29-
await cls.siridb_data_client.connect()
30-
cls.siridb_forecast_client = SiriDBClient(
31-
username=siridb_forecast_username,
32-
password=siridb_forecast_password,
33-
dbname=siridb_forecast_dbname,
34-
hostlist=siridb_forecast_hostlist, # Multiple connections are supported
35-
keepalive=True)
36-
await cls.siridb_forecast_client.connect()
3718

19+
await cls.setup_siridb_connection()
3820

3921
cls.tasks_last_runs = {
4022
'watch_series': None,
4123
'save_to_disk': None,
4224
'check_jobs': None,
43-
'cleanup_clients': None
25+
'manage_connections': None
26+
}
27+
28+
cls.siridb_conn_status = {
29+
'data_conn': False,
30+
'analysis_conn': False
4431
}
4532

33+
await cls.refresh_siridb_status()
34+
35+
@classmethod
36+
def _siridb_config_equal(cls, a, b):
37+
if a.get('username') != b.get('username'):
38+
return False
39+
if a.get('password') != b.get('password'):
40+
return False
41+
if a.get('dbname') != b.get('dbname'):
42+
return False
43+
if a.get('hostlist')[0] != b.get('hostlist')[0]:
44+
return False
45+
46+
return True
47+
48+
@classmethod
49+
async def setup_siridb_connection(cls):
50+
siridb_data_config, siridb_forecast_config = Config.get_siridb_settings()
51+
52+
if cls.siridb_data_client is not None:
53+
cls.stop()
54+
55+
cls.siridb_data_client = SiriDBClient(
56+
**siridb_data_config,
57+
keepalive=True)
58+
await cls.siridb_data_client.connect()
59+
if not cls._siridb_config_equal(siridb_data_config, siridb_forecast_config):
60+
if cls.siridb_forecast_client is not None:
61+
cls.siridb_forecast_client.close()
62+
cls.siridb_forecast_client = SiriDBClient(
63+
**siridb_forecast_config,
64+
keepalive=True)
65+
await cls.siridb_forecast_client.connect()
66+
elif cls.siridb_forecast_client is not None:
67+
cls.siridb_forecast_client.close()
68+
cls.siridb_forecast_client = None
69+
70+
await cls.refresh_siridb_status()
71+
72+
@classmethod
73+
def get_siridb_data_conn(cls):
74+
return cls.siridb_data_client
75+
76+
@classmethod
77+
def get_siridb_forecast_conn(cls):
78+
if cls.siridb_forecast_client is None:
79+
return cls.siridb_data_client
80+
return cls.siridb_forecast_client
81+
82+
83+
@classmethod
84+
def get_siridb_data_conn_status(cls):
85+
return cls.siridb_data_client.connected
86+
87+
@classmethod
88+
def get_siridb_forecast_conn_status(cls):
89+
if cls.siridb_forecast_client is None:
90+
return cls.siridb_data_client.connected
91+
return cls.siridb_forecast_client.connected
92+
93+
@classmethod
94+
async def refresh_siridb_status(cls):
95+
status = {}
96+
status['data_conn'] = cls.get_siridb_data_conn_status()
97+
status['analysis_conn'] = cls.get_siridb_forecast_conn_status()
98+
99+
if status != cls.siridb_conn_status:
100+
cls.siridb_conn_status = status
101+
await cls.sio.emit('update', {
102+
'resource': 'siridb_status',
103+
'updateType': SUBSCRIPTION_CHANGE_TYPE_INITIAL,
104+
'resourceData': cls.siridb_conn_status
105+
}, room='siridb_status_updates')
106+
46107
@classmethod
47108
def stop(cls):
48-
cls.siridb_data_client.close()
49-
cls.siridb_forecast_client.close()
109+
if cls.siridb_data_client is not None:
110+
cls.siridb_data_client.close()
111+
if cls.siridb_forecast_client is not None:
112+
cls.siridb_forecast_client.close()

0 commit comments

Comments
 (0)