From 8cf07ddbeb48745510d7333c6859831ce8d52480 Mon Sep 17 00:00:00 2001 From: Richard deMeester Date: Fri, 16 Jun 2023 18:51:00 +1000 Subject: [PATCH 1/2] [IMP] queue_job: identity_key enhancements 1. In production, a job which is waiting dependencies or which has started, but not completed, should not be repeated if the identity_key matches. 2. In tests, the mock queue handler is now enhanced to allow better mimicking of the identity_key blocks from production. 3. In tests, the mock queue handler now clears the enqueued jobs after performing them, to better reproduce what a production environment would do. --- queue_job/job.py | 2 +- queue_job/tests/common.py | 14 ++++++---- test_queue_job/tests/test_delay_mocks.py | 34 ++++++++++++++++++++++++ 3 files changed, 44 insertions(+), 6 deletions(-) diff --git a/queue_job/job.py b/queue_job/job.py index 457df97ecd..021f2a2c51 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -300,7 +300,7 @@ def job_record_with_same_identity_key(self): .search( [ ("identity_key", "=", self.identity_key), - ("state", "in", [PENDING, ENQUEUED]), + ("state", "in", [WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED]), ], limit=1, ) diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py index b6ebf96f71..aa8ea73d28 100644 --- a/queue_job/tests/common.py +++ b/queue_job/tests/common.py @@ -233,6 +233,7 @@ def by_graph(job): self._perform_graph_jobs(jobs) else: self._perform_single_jobs(jobs) + self.enqueued_jobs = [] def _perform_single_jobs(self, jobs): # we probably don't want to replicate a perfect order here, but at @@ -252,11 +253,14 @@ def _perform_graph_jobs(self, jobs): def _add_job(self, *args, **kwargs): job = Job(*args, **kwargs) - self.enqueued_jobs.append(job) - - patcher = mock.patch.object(job, "store") - self._store_patchers.append(patcher) - patcher.start() + if not job.identity_key or all( + j.identity_key != job.identity_key for j in self.enqueued_jobs + ): + self.enqueued_jobs.append(job) + + patcher = mock.patch.object(job, "store") + self._store_patchers.append(patcher) + patcher.start() job_args = kwargs.pop("args", None) or () job_kwargs = kwargs.pop("kwargs", None) or {} diff --git a/test_queue_job/tests/test_delay_mocks.py b/test_queue_job/tests/test_delay_mocks.py index 3238f15d5a..d513a90797 100644 --- a/test_queue_job/tests/test_delay_mocks.py +++ b/test_queue_job/tests/test_delay_mocks.py @@ -82,6 +82,38 @@ def test_trap_jobs_on_with_delay_recordset_partial_properties(self): ), ) + def test_trap_with_identity_key(self): + with trap_jobs() as trap: + self.env["test.queue.job"].button_that_uses_with_delay() + trap.assert_jobs_count(1) + trap.assert_jobs_count(1, only=self.env["test.queue.job"].testing_method) + + trap.assert_enqueued_job( + self.env["test.queue.job"].testing_method, + args=(1,), + kwargs={"foo": 2}, + properties=dict( + channel="root.test", + description="Test", + eta=15, + identity_key=identity_exact, + max_retries=1, + priority=15, + ), + ) + + # Should not enqueue again + self.env["test.queue.job"].button_that_uses_with_delay() + trap.assert_jobs_count(1) + + trap.perform_enqueued_jobs() + # Should no longer be enqueued + trap.assert_jobs_count(0) + + # Can now requeue + self.env["test.queue.job"].button_that_uses_with_delay() + trap.assert_jobs_count(1) + def test_trap_jobs_on_with_delay_assert_model_count_mismatch(self): recordset = self.env["test.queue.job"].create({"name": "test"}) with trap_jobs() as trap: @@ -219,6 +251,8 @@ def test_trap_jobs_perform(self): # perform the jobs trap.perform_enqueued_jobs() + trap.assert_jobs_count(0) + logs = self.env["ir.logging"].search( [ ("name", "=", "test_queue_job"), From 53bc60e399c8f25a71fa6c664d2860aad070412c Mon Sep 17 00:00:00 2001 From: Simone Orsi Date: Tue, 21 Nov 2023 17:21:03 +0100 Subject: [PATCH 2/2] queue_job: job_record_with_same_identity_key ignore STARTED Good explanation from Guewen: Depending on the use case, we should or should not include. As part of my current work, I use Sidekiq daily. They have a similar feature, but you can set a per-job parameter unique_until with options: start (that would mean up to ENQUEUED here) or success (that would be up to STARTED here). Think about this use case: a job refreshes a cache. Data have changed, we create a pending job. Data change again, no new job because the job is still pending. Job starts. Data change while the job is running. In this very case, we'd like to enqueue a new job otherwise the cache will be outdated. I reckon that both cases are valid, but I fear adding this state in the domain may, silently and in subtle ways, existing behaviors. --- queue_job/job.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/queue_job/job.py b/queue_job/job.py index 021f2a2c51..af0bafb943 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -300,7 +300,7 @@ def job_record_with_same_identity_key(self): .search( [ ("identity_key", "=", self.identity_key), - ("state", "in", [WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED]), + ("state", "in", [WAIT_DEPENDENCIES, PENDING, ENQUEUED]), ], limit=1, )