Skip to content
This repository has been archived by the owner on Oct 22, 2024. It is now read-only.

Commit

Permalink
Worker: discard task updates when task is assigned to another worker
Browse files Browse the repository at this point in the history
The Manager will return a 409 Conflict status for updates to tasks that
have been assigned to another worker. This change prevents the worker
from trying to push these updates indefinitely to the Manager.
  • Loading branch information
sybrenstuvel committed Feb 1, 2017
1 parent 2c4f80a commit 243f71d
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,14 @@ async def flush(self, *, loop: asyncio.AbstractEventLoop) -> bool:

self._log.info('Pushing task update to Manager')
resp = await self.manager.post(url, json=payload, loop=loop)
resp.raise_for_status()
self._log.debug('Master accepted pushed update.')
if resp.status_code == 409:
# The task was assigned to another worker, so we're not allowed to
# push updates for it. We have to un-queue this update, as it will
# never be accepted.
self._log.warning('Task was assigned to another worker, discarding update.')
else:
resp.raise_for_status()
self._log.debug('Master accepted pushed update.')
self._unqueue(rowid)

if queue_is_empty:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,39 @@ async def push_callback(url, *, json, loop):
self.assertEqual(received_url, '/push/there')
self.assertEqual(received_payload, payload)
self.assertEqual(received_loop, self.asyncio_loop)

def test_conflict(self):
"""A 409 Conflict response should discard a queued task update.
"""

from mock_responses import JsonResponse, EmptyResponse

# Try different value types
payload = {'key': 'value',
'sub': {'some': 13,
'values': datetime.datetime.now()}}

tries = 0

async def push_callback(url, *, json, loop):
nonlocal tries
tries += 1
self.shutdown_future.cancel()
return JsonResponse({}, status_code=409)

self.manager.post.side_effect = push_callback

self.tuqueue.queue('/push/here', payload, loop=self.asyncio_loop)

# Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling
# the actual payload.
self.asyncio_loop.run_until_complete(
asyncio.wait_for(
self.tuqueue.work(loop=self.asyncio_loop),
timeout=2
)
)

# There should only be one attempt at delivering this payload.
self.assertEqual(1, tries)
self.assertEqual([], list(self.tuqueue._queue()))

0 comments on commit 243f71d

Please sign in to comment.