From 243f71de28e61aa88593edf19c419c4ce57cc142 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Wed, 1 Feb 2017 12:28:38 +0100 Subject: [PATCH] Worker: discard task updates when task is assigned to another worker The Manager will return a 409 Conflict status for updates to tasks that have been assigned to another worker. This change prevents the worker from trying to push these updates indefinitely to the Manager. --- .../flamenco_worker/upstream_update_queue.py | 10 ++++-- .../tests/test_upstream_update_queue.py | 36 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py index 0f0ab50e..bffe3d6f 100644 --- a/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py +++ b/packages/flamenco-worker-python/flamenco_worker/upstream_update_queue.py @@ -126,8 +126,14 @@ async def flush(self, *, loop: asyncio.AbstractEventLoop) -> bool: self._log.info('Pushing task update to Manager') resp = await self.manager.post(url, json=payload, loop=loop) - resp.raise_for_status() - self._log.debug('Master accepted pushed update.') + if resp.status_code == 409: + # The task was assigned to another worker, so we're not allowed to + # push updates for it. We have to un-queue this update, as it will + # never be accepted. + self._log.warning('Task was assigned to another worker, discarding update.') + else: + resp.raise_for_status() + self._log.debug('Master accepted pushed update.') self._unqueue(rowid) if queue_is_empty: diff --git a/packages/flamenco-worker-python/tests/test_upstream_update_queue.py b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py index b958776c..4f8083a6 100644 --- a/packages/flamenco-worker-python/tests/test_upstream_update_queue.py +++ b/packages/flamenco-worker-python/tests/test_upstream_update_queue.py @@ -147,3 +147,39 @@ async def push_callback(url, *, json, loop): self.assertEqual(received_url, '/push/there') self.assertEqual(received_payload, payload) self.assertEqual(received_loop, self.asyncio_loop) + + def test_conflict(self): + """A 409 Conflict response should discard a queued task update. + """ + + from mock_responses import JsonResponse, EmptyResponse + + # Try different value types + payload = {'key': 'value', + 'sub': {'some': 13, + 'values': datetime.datetime.now()}} + + tries = 0 + + async def push_callback(url, *, json, loop): + nonlocal tries + tries += 1 + self.shutdown_future.cancel() + return JsonResponse({}, status_code=409) + + self.manager.post.side_effect = push_callback + + self.tuqueue.queue('/push/here', payload, loop=self.asyncio_loop) + + # Run the loop for 2 seconds. This should be enough for 3 retries of 0.3 seconds + handling + # the actual payload. + self.asyncio_loop.run_until_complete( + asyncio.wait_for( + self.tuqueue.work(loop=self.asyncio_loop), + timeout=2 + ) + ) + + # There should only be one attempt at delivering this payload. + self.assertEqual(1, tries) + self.assertEqual([], list(self.tuqueue._queue()))