Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[16.0] [ENH] queue_job: identity_key enhancements #546

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def job_record_with_same_identity_key(self):
.search(
[
("identity_key", "=", self.identity_key),
("state", "in", [PENDING, ENQUEUED]),
("state", "in", [WAIT_DEPENDENCIES, PENDING, ENQUEUED, STARTED]),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, did you actually use identity keys on a graph of jobs?
When I was working on the dependencies feature, I had a lot of thoughts and no clear outcome of how it should be handled. A job that would be skipped because it already exists in another graph could make the whole (new) graph incoherent, that's why I ended up checking if all the identity keys match.


About the addition of STARTED, I'm uncertain. Depending on the use case, we should or should not include.
As part of my current work, I use Sidekiq daily. They have a similar feature, but you can set a per-job parameter unique_until with options : start (that would mean up to ENQUEUED here) or success (that would be up to STARTED here).

Think about this use case: a job refreshes a cache. Data have changed, we create a pending job. Data change again, no new job because the job is still pending. Job starts. Data change while the job is running. In this very case, we'd like to enqueue a new job otherwise the cache will be outdated.

I reckon that both cases are valid, but I fear adding this state in the domain may, silently and in subtle ways, existing behaviors.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guewen

No, I did not use it with a graph.

To give it context - a flag switch on res_company for a custom scenario, where a lot of data was involved, caused an extremely long process involving write outs and read ins - so we felt a background job would help the UX. So, it is always one job.

The "fag" to indicate the state will not be available until after the job is complete - so we were using ideniity key to ensure it does not get double run.

If we do not include "started", then a second click and job could be launched while he first one is already executing... and if the jobs ran in parallel, we'd be in strife. So we were hoping identity key would provide that mutual exclusion.

So yes, maybe both cases are valid ... I was going to make a new option for "mutual exclusivity" - or "until finished"... but did not want to change the architecture so much. But we really need started to be included in our checks, because enqueued is likely to only be for a few seconds, and running is likely to be a very very long time...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case you want to explore this direction, I do not think it is a large change: we don't need a new argument even in with_delay as all jobs for the same method will have the same option. It could be a new field on queue.job.function with the 2 options, then the job instance can have access to it through self.job_options and adapt the domain accordingly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth to note: the identity key cannot be a 100% safe way to prevent duplicate jobs : if 2 transaction create the same job concurrently before commiting, there will be a duplicate. To prevent this, we would need a postgres unique constraint and I preferred not to, to prevent having transactions rollbacked because of this.
So it's better if anyway jobs are idempotent and verify during execution if they have still have to do something (can be coupled with a lock or advisory lock).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I see where you are coming from.

],
limit=1,
)
Expand Down
14 changes: 9 additions & 5 deletions queue_job/tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ def by_graph(job):
self._perform_graph_jobs(jobs)
else:
self._perform_single_jobs(jobs)
self.enqueued_jobs = []

def _perform_single_jobs(self, jobs):
# we probably don't want to replicate a perfect order here, but at
Expand All @@ -252,11 +253,14 @@ def _perform_graph_jobs(self, jobs):

def _add_job(self, *args, **kwargs):
job = Job(*args, **kwargs)
self.enqueued_jobs.append(job)

patcher = mock.patch.object(job, "store")
self._store_patchers.append(patcher)
patcher.start()
if not job.identity_key or all(
j.identity_key != job.identity_key for j in self.enqueued_jobs
):
self.enqueued_jobs.append(job)

patcher = mock.patch.object(job, "store")
self._store_patchers.append(patcher)
patcher.start()

job_args = kwargs.pop("args", None) or ()
job_kwargs = kwargs.pop("kwargs", None) or {}
Expand Down
34 changes: 34 additions & 0 deletions test_queue_job/tests/test_delay_mocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,38 @@ def test_trap_jobs_on_with_delay_recordset_partial_properties(self):
),
)

def test_trap_with_identity_key(self):
with trap_jobs() as trap:
self.env["test.queue.job"].button_that_uses_with_delay()
trap.assert_jobs_count(1)
trap.assert_jobs_count(1, only=self.env["test.queue.job"].testing_method)

trap.assert_enqueued_job(
self.env["test.queue.job"].testing_method,
args=(1,),
kwargs={"foo": 2},
properties=dict(
channel="root.test",
description="Test",
eta=15,
identity_key=identity_exact,
max_retries=1,
priority=15,
),
)

# Should not enqueue again
self.env["test.queue.job"].button_that_uses_with_delay()
trap.assert_jobs_count(1)

trap.perform_enqueued_jobs()
# Should no longer be enqueued
trap.assert_jobs_count(0)

# Can now requeue
self.env["test.queue.job"].button_that_uses_with_delay()
trap.assert_jobs_count(1)

def test_trap_jobs_on_with_delay_assert_model_count_mismatch(self):
recordset = self.env["test.queue.job"].create({"name": "test"})
with trap_jobs() as trap:
Expand Down Expand Up @@ -219,6 +251,8 @@ def test_trap_jobs_perform(self):
# perform the jobs
trap.perform_enqueued_jobs()

trap.assert_jobs_count(0)

logs = self.env["ir.logging"].search(
[
("name", "=", "test_queue_job"),
Expand Down