Skip to content

Commit

Permalink
Merge pull request #35 from SiriDB/development
Browse files Browse the repository at this point in the history
bugfixes
  • Loading branch information
joente authored May 3, 2022
2 parents d089700 + 2a44b4b commit f127def
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 32 deletions.
10 changes: 9 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased] - yyyy-mm-dd
## [0.1.0-beta3.2.3] - 2022-05-02

### Added
- Implemented use_max_points property in job configs

### Fixed
- Bug with worker is_going_busy status on reconnect
- Fixed return http status on not found when fetching series details
- Fixed issue with siridb connect when changing settings

## [0.1.0-beta3.2.2] - 2022-04-29

Expand Down
7 changes: 3 additions & 4 deletions lib/api/apihandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ async def get_single_monitored_series(cls, request):
_type_: _description_
"""
series_name = unquote(request.match_info['series_name'])

return web.json_response(
data=await BaseHandler.resp_get_single_monitored_series(
series_name), dumps=safe_json_dumps)
data, status = await BaseHandler.resp_get_single_monitored_series(
series_name)
return web.json_response(data, dumps=safe_json_dumps, status=status)

@classmethod
@EnodoAuth.auth.required
Expand Down
3 changes: 3 additions & 0 deletions lib/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ def get_siridb_settings(cls):

@classmethod
def update_settings(cls, section, key, value):
if cls._settings[section][key] == value:
return True
cls._settings[section][key] = value
return False

@classmethod
def write_settings(cls):
Expand Down
4 changes: 3 additions & 1 deletion lib/enodojobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ async def receive_job_result(cls, writer, packet_type,
series.series_characteristics = \
job_response.get('characteristics')
series.state.health = job_response.get('health')
series.state.interval = job_response.get('interval')
await series.set_job_status(
job.job_config.config_name, JOB_STATUS_DONE)
await series.schedule_job(job.job_config.config_name)
Expand Down Expand Up @@ -481,7 +482,8 @@ async def _send_worker_job_request(cls, worker, job):
job_data = EnodoJobRequestDataModel(
job_id=job.rid, job_config=job.job_config,
series_name=job.series_name,
series_config=series.series_config)
series_config=series.series_config,
series_state=series.state)
data = qpack.packb(job_data.serialize())
header = create_header(len(data), WORKER_JOB, 0)
worker.writer.write(header + data)
Expand Down
57 changes: 39 additions & 18 deletions lib/serverstate.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from asyncio import Lock
import logging

from aiojobs import create_scheduler
from siridb.connector import SiriDBClient
from lib.config import Config
from lib.siridb.siridb import query_time_unit
from lib.socketio import SUBSCRIPTION_CHANGE_TYPE_INITIAL


Expand All @@ -10,8 +14,11 @@ class ServerState:
sio = None
siridb_data_client = None
siridb_output_client = None
siridb_data_client_lock = None
siridb_output_client_lock = None
tasks_last_runs = {}
siridb_conn_status = {}
siridb_ts_unit = None
readiness = None
scheduler = None

Expand All @@ -21,8 +28,11 @@ async def async_setup(cls, sio):
cls.work_queue = True
cls.readiness = False
cls.sio = sio
cls.siridb_data_client_lock = Lock()
cls.siridb_output_client_lock = Lock()

await cls.setup_siridb_connection()
await cls.setup_siridb_data_connection()
await cls.setup_siridb_output_connection()

cls.tasks_last_runs = {
'watch_series': None,
Expand Down Expand Up @@ -58,27 +68,36 @@ def _siridb_config_equal(cls, a, b):
return True

@classmethod
async def setup_siridb_connection(cls):
async def setup_siridb_data_connection(cls):
data_config, output_config = Config.get_siridb_settings()

if cls.siridb_data_client is not None:
cls.stop()

cls.siridb_data_client = SiriDBClient(
**data_config,
keepalive=True)
await cls.siridb_data_client.connect()
if not cls._siridb_config_equal(
data_config, output_config):
logging.info('Setting up SiriDB data Connection')
async with cls.siridb_data_client_lock:
if cls.siridb_data_client is not None:
cls.siridb_data_client.close()

cls.siridb_data_client = SiriDBClient(
**data_config,
keepalive=True)
await cls.siridb_data_client.connect()

await cls.refresh_siridb_status()

@classmethod
async def setup_siridb_output_connection(cls):
data_config, output_config = Config.get_siridb_settings()

logging.info('Setting up SiriDB output Connection')
async with cls.siridb_output_client_lock:
if cls.siridb_output_client is not None:
cls.siridb_output_client.close()
cls.siridb_output_client = SiriDBClient(
**output_config,
keepalive=True)
await cls.siridb_output_client.connect()
elif cls.siridb_output_client is not None:
cls.siridb_output_client.close()
cls.siridb_output_client = None
cls.siridb_output_client = None

if not cls._siridb_config_equal(data_config, output_config):
cls.siridb_output_client = SiriDBClient(
**output_config,
keepalive=True)
await cls.siridb_output_client.connect()

await cls.refresh_siridb_status()

Expand Down Expand Up @@ -106,6 +125,8 @@ def get_siridb_output_conn_status(cls):
async def refresh_siridb_status(cls):
status = {}
status['data_conn'] = cls.get_siridb_data_conn_status()
if status['data_conn']:
cls.siridb_ts_unit = await query_time_unit(cls.siridb_data_client)
status['analysis_conn'] = cls.get_siridb_output_conn_status()

if status != cls.siridb_conn_status:
Expand Down
15 changes: 14 additions & 1 deletion lib/siridb/siridb.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,20 @@ async def does_series_exist(siridb_client, series_name):
return exists


# @classmethod
async def query_time_unit(siridb_client):
result = None
try:
result = await siridb_client.query(
f'show time_precision')
except (QueryError, InsertError, ServerError, PoolError,
AuthenticationError, UserAuthError) as e:
print("Connection problem with SiriDB server")
pass
if result is None:
return None
return result["data"][0]["value"]


async def query_series_data(siridb_client, series_name, selector="*"):
result = None
try:
Expand Down
5 changes: 5 additions & 0 deletions lib/socket/clientmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ def set_config(self, worker_config):
def get_config(self):
return self.worker_config

async def reconnected(self, ip_address, writer):
await super().reconnected(ip_address, writer)
self.busy = False
self.is_going_busy = False

def to_dict(self):
base_dict = super().to_dict()
extra_dict = {
Expand Down
2 changes: 1 addition & 1 deletion lib/socketio/socketiohandlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def _get_all_series(cls, sid, regex_filter, event):
@socketio_auth_required
async def get_series_details(cls, sid, data, event):
series_name = data.get('series_name')
resp = await BaseHandler.resp_get_single_monitored_series(
resp, status = await BaseHandler.resp_get_single_monitored_series(
series_name)
return safe_json_dumps(resp)

Expand Down
15 changes: 11 additions & 4 deletions lib/webserver/basehandler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from aiohttp import web
import logging

from siridb.connector.lib.exceptions import QueryError, InsertError, \
ServerError, PoolError, AuthenticationError, UserAuthError
Expand Down Expand Up @@ -49,9 +50,9 @@ async def resp_get_single_monitored_series(cls, series_name):
"""
series = await SeriesManager.get_series(series_name)
if series is None:
return web.json_response(data={'data': ''}, status=404)
return {'data': ''}, 404
series_data = series.to_dict()
return {'data': series_data}
return {'data': series_data}, 200

@classmethod
async def resp_get_series_forecasts(cls, series_name):
Expand Down Expand Up @@ -319,13 +320,19 @@ async def resp_set_config(cls, data):
"""
section = data.get('section')
keys_and_values = data.get('entries')
changed = False
for key in keys_and_values:
if Config.is_runtime_configurable(section, key):
Config.update_settings(
changed = changed or Config.update_settings(
section, key, keys_and_values[key])
Config.write_settings()
Config.setup_settings_variables()
await ServerState.setup_siridb_connection()

if changed and section == "siridb":
await ServerState.setup_siridb_data_connection()
elif changed and section == "siridb_output":
await ServerState.setup_siridb_output_connection()

return {'data': True}

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ aiohttp_basicauth==1.0.0
aiohttp_cors==0.7.0
jinja2==3.1.0
psutil==5.9.0
python-enodo==0.2.14
python-enodo==0.2.15
python-socketio==5.5.2
qpack==0.0.19
siridb-connector==2.0.8
Expand Down
2 changes: 1 addition & 1 deletion version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '0.1.0-beta3.2.1'
VERSION = '0.1.0-beta3.2.3'

0 comments on commit f127def

Please sign in to comment.