Skip to content

Commit

Permalink
pylint errors fixed; prints on provider start
Browse files Browse the repository at this point in the history
  • Loading branch information
kmodexc committed Apr 29, 2022
1 parent 9cff7ae commit 69728a2
Showing 1 changed file with 44 additions and 26 deletions.
70 changes: 44 additions & 26 deletions pytrackunit/trackunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import json
import os.path
import asyncio
import tqdm
from typing import Callable, Iterable

import tqdm
import aioprocessing
from pytrackunit.tuiter import TuIter

from .tuiter import TuIter
from .tucache import TuCache
from .sqlcache import SqlCache
from .helper import SecureString
Expand All @@ -18,17 +20,15 @@ async def async_queue_generator(queue : aioprocessing.AioQueue):
obj = await queue.coro_get()
if obj is None:
break
else:
yield obj
yield obj

def queue_generator(queue : aioprocessing.AioQueue):
"""converts the queue into an generator"""
while True:
obj = queue.get()
if obj is None:
break
else:
yield obj
yield obj

async def async_collect_data(
queue : aioprocessing.AioQueue,
Expand All @@ -40,38 +40,43 @@ async def async_collect_data(
If set it will call update of the progress_bar after each iteration.
"""


async for data_list, meta in tuit:
proc_data = f_process(data_list,meta)
for x in proc_data:
if x is not None:
await queue.coro_put(x)
for _x in proc_data:
if _x is not None:
await queue.coro_put(_x)
if progress_bar is not None:
progress_bar.update()


async def async_provider_process(
settings : dict,
queue : aioprocessing.AioQueue,
queue : aioprocessing.AioQueue,
veh_id_list : Iterable[str],
tdelta : int,
str_function : str) -> None:
"""
puts the data it gets from trackunit and puts it in the given queue
function can be "error" "history" "candata"
"""
tu = TrackUnit(**settings)
if settings['verbose']:
print("Provider process started")

_tu = TrackUnit(**settings)

if str_function == "history":
f_function = tu.cache.get_history
f_function = _tu.cache.get_history
elif str_function == "candata":
f_function = tu.cache.get_candata
f_function = _tu.cache.get_candata
else:
f_function = tu.cache.get_faults
f_function = _tu.cache.get_faults

await tu.async_get_multi_general_queue(queue,f_function,veh_id_list,tdelta)
await _tu.async_get_multi_general_queue(queue,f_function,veh_id_list,tdelta)

def provider_process(
settings : dict,
queue : aioprocessing.AioQueue,
queue : aioprocessing.AioQueue,
veh_id_list : Iterable[str],
tdelta : int,
str_function : str) -> None:
Expand Down Expand Up @@ -117,15 +122,13 @@ async def async_get_multi_general_queue(
queue : aioprocessing.AioQueue,
func : Callable,
idlist : Iterable[str],
tdelta : int,
f_process : Callable[[Iterable[dict],dict],Iterable]=None) -> None:
tdelta : int) -> None:
"""
returns the data of a list of vehicles with the ids provided in idlist.
f_process can be specified to process slices of data. f_process returns a list
"""

if f_process is None:
f_process = lambda x, meta: zip(x,[meta.get('id','no-id-found')])
f_process = lambda x, meta: zip(x,[meta.get('id','no-id-found')])

iterators = []

Expand All @@ -137,13 +140,25 @@ async def async_get_multi_general_queue(

if self.settings['use_progress_bar']:
pbar = tqdm.tqdm(total=globlen)
else:
pbar = None

zipped_list = list(zip(
len(iterators)*[queue],
iterators,
len(iterators)*[f_process],
len(iterators)*[pbar]))

zipped_list = list(zip(len(iterators)*[queue],iterators,len(iterators)*[f_process],len(iterators)*[pbar]))

tasks = list(map(lambda x: async_collect_data(*x),zipped_list))

if self.settings['verbose']:
print("async_get_multi_general_queue created tasks")

await asyncio.gather(*tasks)

if self.settings['verbose']:
print("async_get_multi_general_queue tasks finished")

await queue.coro_put(None)

def get_multi_general(
Expand All @@ -157,12 +172,15 @@ def get_multi_general(
f_process can be specified to process slices of data. f_process returns a list
"""
queue = aioprocessing.AioQueue(self.settings['queue_size'])
p = aioprocessing.AioProcess(target=provider_process,args=(self.settings,queue,idlist,tdelta,str_process))
p.start()
worker = aioprocessing.AioProcess(
target=provider_process,
args=(self.settings,queue,idlist,tdelta,str_process))
# pylint: disable=no-member
worker.start()
# pylint: enable=no-member
if use_async_generator:
return async_queue_generator(queue)
else:
return queue_generator(queue)
return queue_generator(queue)

async def async_get_unitlist(self,_type=None,sort_by_hours=True):
"""unitList method"""
Expand Down

0 comments on commit 69728a2

Please sign in to comment.