Skip to content

Commit

Permalink
[ADD] Automate requeue of inactive jobs
Browse files Browse the repository at this point in the history
* Job Queue Lock model added to track jobs being actively processed by Odoo
* Any job not being actively worked on will either be:
    * Requeued, if max_retries not reached
    * Marked as failed, if max_retries reached
* Covers cases where the Odoo instance is restarted or a job is killed by
  Odoo due to exceeding the limit_time_cpu setting

Backport of enhancement from Odoo 16 PR on OCA:  OCA#716
  • Loading branch information
PeterC10 committed Feb 11, 2025
1 parent d61950a commit b3865ca
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 19 deletions.
2 changes: 1 addition & 1 deletion queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def _try_perform_job(self, env, job):
job.set_started()
job.store()
http.request.env.cr.commit()

job.lock()
_logger.debug('%s started', job)
job.perform()
job.set_done()
Expand Down
55 changes: 55 additions & 0 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,60 @@ def load(cls, env, job_uuid):
'Job %s does no longer exist in the storage.' % job_uuid)
return cls._load_from_db_record(stored)

def add_lock_record(self):
"""
Create row in db to be locked while the job is being performed.
"""
self.env.cr.execute(
"""
INSERT INTO
queue_job_lock (id, queue_job_id)
SELECT
id, id
FROM
queue_job
WHERE
uuid = %s
ON CONFLICT(id)
DO NOTHING
""",
[self.uuid],
)

def lock(self):
"""
Lock row of job that is being performed
If a job cannot be locked,
it means that the job wasn't started,
a RetryableJobError is thrown.
"""
self.env.cr.execute(
"""
SELECT
*
FROM
queue_job_lock
WHERE
queue_job_id in (
SELECT
id
FROM
queue_job
WHERE
uuid = %s
AND state='started'
)
FOR UPDATE
""",
[self.uuid],
)

# 1 job should be locked
if 1 != len(self.env.cr.fetchall()):
raise RetryableJobError(
f"Trying to lock job that wasn't started, uuid: {self.uuid}"
)

@classmethod
def _load_from_db_record(cls, job_db_record):
stored = job_db_record
Expand Down Expand Up @@ -651,6 +705,7 @@ def set_enqueued(self):
def set_started(self):
self.state = STARTED
self.date_started = datetime.now()
self.add_lock_record()

def set_done(self, result=None):
self.state = DONE
Expand Down
105 changes: 87 additions & 18 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@
import odoo
from odoo.tools import config

from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE
from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE, FAILED

SELECT_TIMEOUT = 60
ERROR_RECOVERY_DELAY = 5
Expand Down Expand Up @@ -214,19 +214,6 @@ def _async_http_get(scheme, host, port, user, password, db_name, job_uuid):
response = session.get(url, timeout=30, auth=auth)
response.raise_for_status()

# Method to set failed job (due to timeout, etc) as pending,
# to avoid keeping it as enqueued.
def set_job_pending():
connection_info = _connection_info_for(db_name)
conn = psycopg2.connect(**connection_info)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cr:
cr.execute(
"UPDATE queue_job SET state=%s, "
"date_enqueued=NULL, date_started=NULL "
"WHERE uuid=%s and state=%s", (PENDING, job_uuid, ENQUEUED)
)

# TODO: better way to HTTP GET asynchronously (grequest, ...)?
# if this was python3 I would be doing this with
# asyncio, aiohttp and aiopg
Expand All @@ -249,12 +236,8 @@ def urlopen():
# for HTTP Response codes between 400 and 500 or a Server Error
# for codes between 500 and 600
response.raise_for_status()
except requests.Timeout:
set_job_pending()
except Exception:
_logger.exception("exception in GET %s", url)
session.cookies.clear()
set_job_pending()
thread = threading.Thread(target=urlopen)
thread.daemon = True
thread.start()
Expand Down Expand Up @@ -342,6 +325,86 @@ def set_job_enqueued(self, uuid):
"WHERE uuid=%s",
(ENQUEUED, uuid))

def _query_requeue_dead_jobs(self):
return """
UPDATE
queue_job
SET
state=(
CASE
WHEN
max_retries IS NOT NULL AND
retry IS NOT NULL AND
retry>=max_retries
THEN 'failed'
ELSE 'pending'
END),
retry=(CASE WHEN state='started' THEN COALESCE(retry,0)+1 ELSE retry END),
exc_info=(
CASE
WHEN
max_retries IS NOT NULL AND
retry IS NOT NULL AND
retry>=max_retries
THEN 'Job not completed, max retries reached'
ELSE exc_info
END)
WHERE
id in (
SELECT
queue_job_id
FROM
queue_job_lock
WHERE
queue_job_id in (
SELECT
id
FROM
queue_job
WHERE
state IN ('enqueued','started')
AND date_enqueued <
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
)
FOR UPDATE SKIP LOCKED
)
RETURNING uuid, state, name, method_name
"""

def requeue_dead_jobs(self):
"""
Set started and enqueued jobs but not locked to pending
A job is locked when it's being executed
When a job is killed, it releases the lock
If the number of retries exceeds the number of max retries,
the job is set as 'failed' with the error 'JobFoundDead'.
Adding a buffer on 'date_enqueued' to check
that it has been enqueued for more than 10sec.
This prevents from requeuing jobs before they are actually started.
When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted.
Returns information about inactive jobs, those requeued and those
marked as failed.
"""
pending_job_info = []
failed_job_info = []

with closing(self.conn.cursor()) as cr:
query = self._query_requeue_dead_jobs()
cr.execute(query)
for (uuid, state, name, method) in cr.fetchall():
if state == PENDING:
pending_job_info.append((uuid, name, method))
_logger.warning("Re-queued inactive job with UUID: %s", uuid)
elif state == FAILED:
failed_job_info.append((uuid, name, method))
_logger.warning("Inactive job marked as failed with UUID: %s", uuid)

return pending_job_info, failed_job_info


class QueueJobRunner(object):

Expand Down Expand Up @@ -417,6 +480,11 @@ def initialize_databases(self):
self.channel_manager.notify(db_name, *job_data)
_logger.info('queue job runner ready for db %s', db_name)

def requeue_dead_jobs(self):
for db in self.db_by_name.values():
if db.has_queue_job:
db.requeue_dead_jobs()

def run_jobs(self):
now = _odoo_now()
for job in self.channel_manager.get_jobs_to_run(now):
Expand Down Expand Up @@ -495,6 +563,7 @@ def run(self):
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
self.requeue_dead_jobs()
self.process_notifications()
self.run_jobs()
self.wait_notification()
Expand Down
1 change: 1 addition & 0 deletions queue_job/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
from . import base
from . import queue_job
from . import queue_job_lock
16 changes: 16 additions & 0 deletions queue_job/models/queue_job_lock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Copyright 2025 ACSONE SA/NV
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from odoo import fields, models


class QueueJobLock(models.Model):
_name = "queue.job.lock"
_description = "Queue Job Lock"

queue_job_id = fields.Many2one(
comodel_name="queue.job",
required=True,
ondelete="cascade",
index=True,
)
1 change: 1 addition & 0 deletions queue_job/security/ir.model.access.csv
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink
access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1
access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1
access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1
access_queue_job_lock_manager,queue job lock manager,queue_job.model_queue_job_lock,queue_job.group_queue_job_manager,1,0,0,0

0 comments on commit b3865ca

Please sign in to comment.