diff --git a/pytrackunit/trackunit.py b/pytrackunit/trackunit.py index d8b0724..4562461 100644 --- a/pytrackunit/trackunit.py +++ b/pytrackunit/trackunit.py @@ -3,10 +3,12 @@ import json import os.path import asyncio -import tqdm from typing import Callable, Iterable + +import tqdm import aioprocessing -from pytrackunit.tuiter import TuIter + +from .tuiter import TuIter from .tucache import TuCache from .sqlcache import SqlCache from .helper import SecureString @@ -18,8 +20,7 @@ async def async_queue_generator(queue : aioprocessing.AioQueue): obj = await queue.coro_get() if obj is None: break - else: - yield obj + yield obj def queue_generator(queue : aioprocessing.AioQueue): """converts the queue into an generator""" @@ -27,8 +28,7 @@ def queue_generator(queue : aioprocessing.AioQueue): obj = queue.get() if obj is None: break - else: - yield obj + yield obj async def async_collect_data( queue : aioprocessing.AioQueue, @@ -40,17 +40,19 @@ async def async_collect_data( If set it will call update of the progress_bar after each iteration. """ + async for data_list, meta in tuit: proc_data = f_process(data_list,meta) - for x in proc_data: - if x is not None: - await queue.coro_put(x) + for _x in proc_data: + if _x is not None: + await queue.coro_put(_x) if progress_bar is not None: progress_bar.update() + async def async_provider_process( settings : dict, - queue : aioprocessing.AioQueue, + queue : aioprocessing.AioQueue, veh_id_list : Iterable[str], tdelta : int, str_function : str) -> None: @@ -58,20 +60,23 @@ async def async_provider_process( puts the data it gets from trackunit and puts it in the given queue function can be "error" "history" "candata" """ - tu = TrackUnit(**settings) + if settings['verbose']: + print("Provider process started") + + _tu = TrackUnit(**settings) if str_function == "history": - f_function = tu.cache.get_history + f_function = _tu.cache.get_history elif str_function == "candata": - f_function = tu.cache.get_candata + f_function = _tu.cache.get_candata else: - f_function = tu.cache.get_faults + f_function = _tu.cache.get_faults - await tu.async_get_multi_general_queue(queue,f_function,veh_id_list,tdelta) + await _tu.async_get_multi_general_queue(queue,f_function,veh_id_list,tdelta) def provider_process( settings : dict, - queue : aioprocessing.AioQueue, + queue : aioprocessing.AioQueue, veh_id_list : Iterable[str], tdelta : int, str_function : str) -> None: @@ -117,15 +122,13 @@ async def async_get_multi_general_queue( queue : aioprocessing.AioQueue, func : Callable, idlist : Iterable[str], - tdelta : int, - f_process : Callable[[Iterable[dict],dict],Iterable]=None) -> None: + tdelta : int) -> None: """ returns the data of a list of vehicles with the ids provided in idlist. f_process can be specified to process slices of data. f_process returns a list """ - if f_process is None: - f_process = lambda x, meta: zip(x,[meta.get('id','no-id-found')]) + f_process = lambda x, meta: zip(x,[meta.get('id','no-id-found')]) iterators = [] @@ -137,13 +140,25 @@ async def async_get_multi_general_queue( if self.settings['use_progress_bar']: pbar = tqdm.tqdm(total=globlen) + else: + pbar = None + + zipped_list = list(zip( + len(iterators)*[queue], + iterators, + len(iterators)*[f_process], + len(iterators)*[pbar])) - zipped_list = list(zip(len(iterators)*[queue],iterators,len(iterators)*[f_process],len(iterators)*[pbar])) - tasks = list(map(lambda x: async_collect_data(*x),zipped_list)) + if self.settings['verbose']: + print("async_get_multi_general_queue created tasks") + await asyncio.gather(*tasks) + if self.settings['verbose']: + print("async_get_multi_general_queue tasks finished") + await queue.coro_put(None) def get_multi_general( @@ -157,12 +172,15 @@ def get_multi_general( f_process can be specified to process slices of data. f_process returns a list """ queue = aioprocessing.AioQueue(self.settings['queue_size']) - p = aioprocessing.AioProcess(target=provider_process,args=(self.settings,queue,idlist,tdelta,str_process)) - p.start() + worker = aioprocessing.AioProcess( + target=provider_process, + args=(self.settings,queue,idlist,tdelta,str_process)) + # pylint: disable=no-member + worker.start() + # pylint: enable=no-member if use_async_generator: return async_queue_generator(queue) - else: - return queue_generator(queue) + return queue_generator(queue) async def async_get_unitlist(self,_type=None,sort_by_hours=True): """unitList method"""