Skip to content

Commit

Permalink
libasynctasks: fixed a bug with delayed tasks which caused them to la…
Browse files Browse the repository at this point in the history
…y in the task manager's queue forever
  • Loading branch information
cmarshall108 committed Sep 13, 2018
1 parent 968e19a commit e419a82
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions src/libasynctasks.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -189,12 +189,14 @@ class TaskScheduler(threading.Thread):

__slots__ = (
'_task_manager',
'_mutex_lock',
'_shutdown',
'_task_queue'
)

def __init__(self, task_manager):
self._task_manager = task_manager
self._mutex_lock = threading.RLock()

self._shutdown = False
self._task_queue = collections.deque()
Expand Down Expand Up @@ -300,21 +302,23 @@ class TaskScheduler(threading.Thread):
return
elif result == TASK_WAIT:
task.can_delay = True
# since this task is delayed, we need to keep it on it's original
# thread; otherwise our thread will be killed and the task will
# float in oblivion forever...
self._task_queue.append(task)
elif result == TASK_CONT:
task.can_delay = False
# add this task back to the task manager's queue so it can
# be ran on another thread if another one is available and has
# even less currently running tasks on it...
self._task_manager.task_queue.append(task)
else:
# check to see if we got any other result than what we
# are expecting, tasks do not return values when they are called
# like a normal function... So we should never expect this to be the case.
raise TaskError('Got invalid result <%r> when running task <%s>!' % (
result, task.name))

# the task want's to be placed back into the queue,
# instead of waiting for a new scheduler to be created,
# let's just add this task back to our own scheduler so we
# can save time between each execution...
self._task_manager.task_queue.append(task)

# finally let's check to see if we have any tasks remaining
# in the task queue, if we do not; then this means we have
# served our purpose and is no longer needed...
Expand All @@ -329,16 +333,20 @@ class TaskScheduler(threading.Thread):
"""

while not self._shutdown:
try:
self.update()
except (KeyboardInterrupt, SystemExit):
break
# acquire the mutex lock so we don't change the queue
# during it's iteration on the other thread...
with self._mutex_lock:
try:
self.update()
except (KeyboardInterrupt, SystemExit):
break

thread_wait()

self._task_manager.remove_scheduler(self)

def __del__(self):
self._mutex_lock = None
self._task_queue = None


Expand Down

0 comments on commit e419a82

Please sign in to comment.