diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 843bfa6e46..22f4ae490c 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -56,7 +56,7 @@ def _try_perform_job(self, env, job): job.set_started() job.store() http.request.env.cr.commit() - + job.lock() _logger.debug('%s started', job) job.perform() job.set_done() diff --git a/queue_job/job.py b/queue_job/job.py index d1ebb62006..eda984324b 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -252,6 +252,60 @@ def load(cls, env, job_uuid): 'Job %s does no longer exist in the storage.' % job_uuid) return cls._load_from_db_record(stored) + def add_lock_record(self): + """ + Create row in db to be locked while the job is being performed. + """ + self.env.cr.execute( + """ + INSERT INTO + queue_job_lock (id, queue_job_id) + SELECT + id, id + FROM + queue_job + WHERE + uuid = %s + ON CONFLICT(id) + DO NOTHING + """, + [self.uuid], + ) + + def lock(self): + """ + Lock row of job that is being performed + If a job cannot be locked, + it means that the job wasn't started, + a RetryableJobError is thrown. + """ + self.env.cr.execute( + """ + SELECT + * + FROM + queue_job_lock + WHERE + queue_job_id in ( + SELECT + id + FROM + queue_job + WHERE + uuid = %s + AND state='started' + ) + FOR UPDATE + """, + [self.uuid], + ) + + # 1 job should be locked + if 1 != len(self.env.cr.fetchall()): + raise RetryableJobError( + f"Trying to lock job that wasn't started, uuid: {self.uuid}" + ) + @classmethod def _load_from_db_record(cls, job_db_record): stored = job_db_record @@ -651,6 +705,7 @@ def set_enqueued(self): def set_started(self): self.state = STARTED self.date_started = datetime.now() + self.add_lock_record() def set_done(self, result=None): self.state = DONE diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 35db4405fe..3ab31a9a8f 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -148,7 +148,7 @@ import odoo from odoo.tools import config -from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE +from .channels import ChannelManager, PENDING, ENQUEUED, NOT_DONE, FAILED SELECT_TIMEOUT = 60 ERROR_RECOVERY_DELAY = 5 @@ -214,19 +214,6 @@ def _async_http_get(scheme, host, port, user, password, db_name, job_uuid): response = session.get(url, timeout=30, auth=auth) response.raise_for_status() - # Method to set failed job (due to timeout, etc) as pending, - # to avoid keeping it as enqueued. - def set_job_pending(): - connection_info = _connection_info_for(db_name) - conn = psycopg2.connect(**connection_info) - conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT) - with closing(conn.cursor()) as cr: - cr.execute( - "UPDATE queue_job SET state=%s, " - "date_enqueued=NULL, date_started=NULL " - "WHERE uuid=%s and state=%s", (PENDING, job_uuid, ENQUEUED) - ) - # TODO: better way to HTTP GET asynchronously (grequest, ...)? # if this was python3 I would be doing this with # asyncio, aiohttp and aiopg @@ -249,12 +236,8 @@ def urlopen(): # for HTTP Response codes between 400 and 500 or a Server Error # for codes between 500 and 600 response.raise_for_status() - except requests.Timeout: - set_job_pending() except Exception: - _logger.exception("exception in GET %s", url) session.cookies.clear() - set_job_pending() thread = threading.Thread(target=urlopen) thread.daemon = True thread.start() @@ -342,6 +325,86 @@ def set_job_enqueued(self, uuid): "WHERE uuid=%s", (ENQUEUED, uuid)) + def _query_requeue_dead_jobs(self): + return """ + UPDATE + queue_job + SET + state=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>=max_retries + THEN 'failed' + ELSE 'pending' + END), + retry=(CASE WHEN state='started' THEN COALESCE(retry,0)+1 ELSE retry END), + exc_info=( + CASE + WHEN + max_retries IS NOT NULL AND + retry IS NOT NULL AND + retry>=max_retries + THEN 'Job not completed, max retries reached' + ELSE exc_info + END) + WHERE + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_id in ( + SELECT + id + FROM + queue_job + WHERE + state IN ('enqueued','started') + AND date_enqueued < + (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + ) + FOR UPDATE SKIP LOCKED + ) + RETURNING uuid, state, name, method_name + """ + + def requeue_dead_jobs(self): + """ + Set started and enqueued jobs but not locked to pending + A job is locked when it's being executed + When a job is killed, it releases the lock + If the number of retries exceeds the number of max retries, + the job is set as 'failed' with the error 'JobFoundDead'. + Adding a buffer on 'date_enqueued' to check + that it has been enqueued for more than 10sec. + This prevents from requeuing jobs before they are actually started. + When Odoo shuts down normally, it waits for running jobs to finish. + However, when the Odoo server crashes or is otherwise force-stopped, + running jobs are interrupted while the runner has no chance to know + they have been aborted. + + Returns information about inactive jobs, those requeued and those + marked as failed. + """ + pending_job_info = [] + failed_job_info = [] + + with closing(self.conn.cursor()) as cr: + query = self._query_requeue_dead_jobs() + cr.execute(query) + for (uuid, state, name, method) in cr.fetchall(): + if state == PENDING: + pending_job_info.append((uuid, name, method)) + _logger.warning("Re-queued inactive job with UUID: %s", uuid) + elif state == FAILED: + failed_job_info.append((uuid, name, method)) + _logger.warning("Inactive job marked as failed with UUID: %s", uuid) + + return pending_job_info, failed_job_info + class QueueJobRunner(object): @@ -417,6 +480,11 @@ def initialize_databases(self): self.channel_manager.notify(db_name, *job_data) _logger.info('queue job runner ready for db %s', db_name) + def requeue_dead_jobs(self): + for db in self.db_by_name.values(): + if db.has_queue_job: + db.requeue_dead_jobs() + def run_jobs(self): now = _odoo_now() for job in self.channel_manager.get_jobs_to_run(now): @@ -495,6 +563,7 @@ def run(self): _logger.info("database connections ready") # inner loop does the normal processing while not self._stop: + self.requeue_dead_jobs() self.process_notifications() self.run_jobs() self.wait_notification() diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index 0909032522..fa9116cf63 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -1,2 +1,3 @@ from . import base from . import queue_job +from . import queue_job_lock diff --git a/queue_job/models/queue_job_lock.py b/queue_job/models/queue_job_lock.py new file mode 100644 index 0000000000..b01c7f3a91 --- /dev/null +++ b/queue_job/models/queue_job_lock.py @@ -0,0 +1,16 @@ +# Copyright 2025 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo import fields, models + + +class QueueJobLock(models.Model): + _name = "queue.job.lock" + _description = "Queue Job Lock" + + queue_job_id = fields.Many2one( + comodel_name="queue.job", + required=True, + ondelete="cascade", + index=True, + ) diff --git a/queue_job/security/ir.model.access.csv b/queue_job/security/ir.model.access.csv index e90eee9ae4..c5580a3545 100644 --- a/queue_job/security/ir.model.access.csv +++ b/queue_job/security/ir.model.access.csv @@ -2,3 +2,4 @@ id,name,model_id:id,group_id:id,perm_read,perm_write,perm_create,perm_unlink access_queue_job_manager,queue job manager,queue_job.model_queue_job,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_function_manager,queue job functions manager,queue_job.model_queue_job_function,queue_job.group_queue_job_manager,1,1,1,1 access_queue_job_channel_manager,queue job channel manager,queue_job.model_queue_job_channel,queue_job.group_queue_job_manager,1,1,1,1 +access_queue_job_lock_manager,queue job lock manager,queue_job.model_queue_job_lock,queue_job.group_queue_job_manager,1,0,0,0