Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
timoj committed Nov 25, 2019
1 parent b5ec9c9 commit d08895a
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 203 deletions.
133 changes: 0 additions & 133 deletions lib/socket/client.py

This file was deleted.

54 changes: 0 additions & 54 deletions lib/socket/package.py

This file was deleted.

31 changes: 15 additions & 16 deletions listener.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import asyncio
import configparser
import datetime
import json

from lib.siridb.pipeserver import PipeServer
from lib.socket.client import Client
from lib.socket.package import *
from enodo.client import Client
from enodo.client.package import *


class Listener:
Expand All @@ -15,9 +14,9 @@ def __init__(self, loop, config_path):
self._config = configparser.ConfigParser()
self._config.read(config_path)
self._series_to_watch = ()
self._serie_counter_updates = {}
self._serie_updates = {}
self._client = Client(loop, self._config['enodo']['hub_hostname'], int(self._config['enodo']['hub_port']),
'listener', heartbeat_interval=int(self._config['enodo']['heartbeat_interval']))
'listener', self._config['enodo']['token'], heartbeat_interval=int(self._config['enodo']['heartbeat_interval']))
self._client_run_task = None
self._updater_task = None
self._last_update = datetime.datetime.now()
Expand All @@ -43,26 +42,26 @@ async def _handle_pipe_data(self, data):
print("INCOMMING DATA")
for serie_name, values in data.items():
if serie_name in self._series_to_watch:
if serie_name in self._serie_counter_updates:
serie_counter = self._serie_counter_updates.get(serie_name)
self._serie_counter_updates[serie_name] = serie_counter + len(values)
print("h2")
if serie_name in self._serie_updates:
self._serie_updates[serie_name].extend(values)
else:
self._serie_counter_updates[serie_name] = len(values)

print(self._serie_counter_updates)
self._serie_updates[serie_name] = values

async def _updater(self):
while 1:
if (datetime.datetime.now() - self._last_update).total_seconds() > int(
self._config['enodo']['counter_update_interval']) and len(self._serie_counter_updates.keys()):
self._config['enodo']['counter_update_interval']) and len(self._serie_updates.keys()):
print("HERE")
await self._send_update()
self._last_update = datetime.datetime.now()
await asyncio.sleep(1)

async def _send_update(self):
update_encoded = json.dumps(self._serie_counter_updates).encode('utf-8')
self._serie_counter_updates = {}
await self._client.send_message(update_encoded, LISTENER_ADD_SERIE_COUNT)
print("SENDING UPDATE")
update_encoded = self._serie_updates
await self._client.send_message(update_encoded, LISTENER_NEW_SERIES_POINTS)
self._serie_updates = {}

async def start_listener(self):
await self._start_siridb_pipeserver()
Expand All @@ -74,7 +73,7 @@ async def start_listener(self):

async def _handle_update_series(self, data):
print("Received new list of series to watch")
self._series_to_watch = set(json.loads(data))
self._series_to_watch = set(data)

def close(self):
self._client_run_task.cancel()
Expand Down

0 comments on commit d08895a

Please sign in to comment.