Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Queue Job and Queue Job Subscribe to Odoo 18.0 #696

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
707 changes: 707 additions & 0 deletions queue_job/README.rst

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions queue_job/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from . import controllers
from . import fields
from . import models
from . import wizards
from . import jobrunner
from .post_init_hook import post_init_hook
from .post_load import post_load

# shortcuts
from .job import identity_exact
35 changes: 35 additions & 0 deletions queue_job/__manifest__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

{
"name": "Job Queue",
"version": "18.0.1.0.0",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
"category": "Generic Modules",
"depends": ["mail", "base_sparse_field", "web"],
"external_dependencies": {"python": ["requests"]},
"data": [
"security/security.xml",
"security/ir.model.access.csv",
"views/queue_job_views.xml",
"views/queue_job_channel_views.xml",
"views/queue_job_function_views.xml",
"wizards/queue_jobs_to_done_views.xml",
"wizards/queue_jobs_to_cancelled_views.xml",
"wizards/queue_requeue_job_views.xml",
"views/queue_job_menus.xml",
"data/queue_data.xml",
"data/queue_job_function_data.xml",
],
"assets": {
"web.assets_backend": [
"/queue_job/static/src/views/**/*",
],
},
"installable": True,
"development_status": "Mature",
"maintainers": ["guewen"],
"post_init_hook": "post_init_hook",
"post_load": "post_load",
}
1 change: 1 addition & 0 deletions queue_job/controllers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from . import main
297 changes: 297 additions & 0 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
# Copyright (c) 2015-2016 ACSONE SA/NV (<http://acsone.eu>)
# Copyright 2013-2016 Camptocamp SA
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)

import logging
import random
import time
import traceback
from io import StringIO

from psycopg2 import OperationalError, errorcodes
from werkzeug.exceptions import BadRequest, Forbidden

from odoo import SUPERUSER_ID, _, api, http, registry, tools
from odoo.service.model import PG_CONCURRENCY_ERRORS_TO_RETRY

from ..delay import chain, group
from ..exception import FailedJobError, NothingToDoJob, RetryableJobError
from ..job import ENQUEUED, Job

_logger = logging.getLogger(__name__)

PG_RETRY = 5 # seconds

DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE = 5


class RunJobController(http.Controller):
def _try_perform_job(self, env, job):
"""Try to perform the job."""
job.set_started()
job.store()
env.cr.commit()
_logger.debug("%s started", job)

job.perform()
job.set_done()
job.store()
env.flush_all()
env.cr.commit()
_logger.debug("%s done", job)

def _enqueue_dependent_jobs(self, env, job):
tries = 0
while True:
try:
job.enqueue_waiting()
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise
if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE:
_logger.info(
"%s, maximum number of tries reached to update dependencies",
errorcodes.lookup(err.pgcode),
)
raise
wait_time = random.uniform(0.0, 2**tries)
tries += 1
_logger.info(
"%s, retry %d/%d in %.04f sec...",
errorcodes.lookup(err.pgcode),
tries,
DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE,
wait_time,
)
time.sleep(wait_time)
else:
break

@http.route("/queue_job/runjob", type="http", auth="none", save_session=False)
def runjob(self, db, job_uuid, **kw):
http.request.session.db = db
env = http.request.env(user=SUPERUSER_ID)

def retry_postpone(job, message, seconds=None):
job.env.clear()
with registry(job.env.cr.dbname).cursor() as new_cr:
job.env = api.Environment(new_cr, SUPERUSER_ID, {})
job.postpone(result=message, seconds=seconds)
job.set_pending(reset_retry=False)
job.store()

# ensure the job to run is in the correct state and lock the record
env.cr.execute(
"SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE",
(job_uuid, ENQUEUED),
)
if not env.cr.fetchone():
_logger.warning(
"was requested to run job %s, but it does not exist, "
"or is not in state %s",
job_uuid,
ENQUEUED,
)
return ""

job = Job.load(env, job_uuid)
assert job and job.state == ENQUEUED

try:
try:
self._try_perform_job(env, job)
except OperationalError as err:
# Automatically retry the typical transaction serialization
# errors
if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY:
raise

_logger.debug("%s OperationalError, postponed", job)
raise RetryableJobError(
tools.ustr(err.pgerror, errors="replace"), seconds=PG_RETRY
) from err

except NothingToDoJob as err:
if str(err):
msg = str(err)
else:
msg = _("Job interrupted and set to Done: nothing to do.")
job.set_done(msg)
job.store()
env.cr.commit()

except RetryableJobError as err:
# delay the job later, requeue
retry_postpone(job, str(err), seconds=err.seconds)
_logger.debug("%s postponed", job)
# Do not trigger the error up because we don't want an exception
# traceback in the logs we should have the traceback when all
# retries are exhausted
env.cr.rollback()
return ""

except (FailedJobError, Exception) as orig_exception:
buff = StringIO()
traceback.print_exc(file=buff)
traceback_txt = buff.getvalue()
_logger.error(traceback_txt)
job.env.clear()
with registry(job.env.cr.dbname).cursor() as new_cr:
job.env = job.env(cr=new_cr)
vals = self._get_failure_values(job, traceback_txt, orig_exception)
job.set_failed(**vals)
job.store()
buff.close()
raise

_logger.debug("%s enqueue depends started", job)
self._enqueue_dependent_jobs(env, job)
_logger.debug("%s enqueue depends done", job)

return ""

def _get_failure_values(self, job, traceback_txt, orig_exception):
"""Collect relevant data from exception."""
exception_name = orig_exception.__class__.__name__
if hasattr(orig_exception, "__module__"):
exception_name = orig_exception.__module__ + "." + exception_name
exc_message = getattr(orig_exception, "name", str(orig_exception))
return {
"exc_info": traceback_txt,
"exc_name": exception_name,
"exc_message": exc_message,
}

# flake8: noqa: C901
@http.route("/queue_job/create_test_job", type="http", auth="user")
def create_test_job(
self,
priority=None,
max_retries=None,
channel=None,
description="Test job",
size=1,
failure_rate=0,
):
if not http.request.env.user.has_group("base.group_erp_manager"):
raise Forbidden(_("Access Denied"))

if failure_rate is not None:
try:
failure_rate = float(failure_rate)
except (ValueError, TypeError):
failure_rate = 0

if not (0 <= failure_rate <= 1):
raise BadRequest("failure_rate must be between 0 and 1")

if size is not None:
try:
size = int(size)
except (ValueError, TypeError):
size = 1

if priority is not None:
try:
priority = int(priority)
except ValueError:
priority = None

if max_retries is not None:
try:
max_retries = int(max_retries)
except ValueError:
max_retries = None

if size == 1:
return self._create_single_test_job(
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate,
)

if size > 1:
return self._create_graph_test_jobs(
size,
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
failure_rate=failure_rate,
)
return ""

def _create_single_test_job(
self,
priority=None,
max_retries=None,
channel=None,
description="Test job",
size=1,
failure_rate=0,
):
delayed = (
http.request.env["queue.job"]
.with_delay(
priority=priority,
max_retries=max_retries,
channel=channel,
description=description,
)
._test_job(failure_rate=failure_rate)
)
return f"job uuid: {delayed.db_record().uuid}"

TEST_GRAPH_MAX_PER_GROUP = 5

def _create_graph_test_jobs(
self,
size,
priority=None,
max_retries=None,
channel=None,
description="Test job",
failure_rate=0,
):
model = http.request.env["queue.job"]
current_count = 0

possible_grouping_methods = (chain, group)

tails = [] # we can connect new graph chains/groups to tails
root_delayable = None
while current_count < size:
jobs_count = min(
size - current_count, random.randint(1, self.TEST_GRAPH_MAX_PER_GROUP)
)

jobs = []
for __ in range(jobs_count):
current_count += 1
jobs.append(
model.delayable(
priority=priority,
max_retries=max_retries,
channel=channel,
description="%s #%d" % (description, current_count),
)._test_job(failure_rate=failure_rate)
)

grouping = random.choice(possible_grouping_methods)
delayable = grouping(*jobs)
if not root_delayable:
root_delayable = delayable
else:
tail_delayable = random.choice(tails)
tail_delayable.on_done(delayable)
tails.append(delayable)

root_delayable.delay()

return "graph uuid: {}".format(
list(root_delayable._head())[0]._generated_job.graph_uuid
)
34 changes: 34 additions & 0 deletions queue_job/data/queue_data.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?xml version="1.0" encoding="utf-8" ?>
<odoo>
<data noupdate="1">
<record id="ir_cron_queue_job_garbage_collector" model="ir.cron">
<field name="name">Jobs Garbage Collector</field>
<field name="interval_number">5</field>
<field name="interval_type">minutes</field>
<field ref="model_queue_job" name="model_id" />
<field name="state">code</field>
<field name="code">model.requeue_stuck_jobs()</field>
</record>
<!-- Queue-job-related subtypes for messaging / Chatter -->
<record id="mt_job_failed" model="mail.message.subtype">
<field name="name">Job failed</field>
<field name="res_model">queue.job</field>
<field name="default" eval="True" />
</record>
<record id="ir_cron_autovacuum_queue_jobs" model="ir.cron">
<field name="name">AutoVacuum Job Queue</field>
<field ref="model_queue_job" name="model_id" />
<field eval="True" name="active" />
<field name="user_id" ref="base.user_root" />
<field name="interval_number">1</field>
<field name="interval_type">days</field>
<field name="state">code</field>
<field name="code">model.autovacuum()</field>
</record>
</data>
<data noupdate="0">
<record model="queue.job.channel" id="channel_root">
<field name="name">root</field>
</record>
</data>
</odoo>
6 changes: 6 additions & 0 deletions queue_job/data/queue_job_function_data.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<odoo noupdate="1">
<record id="job_function_queue_job__test_job" model="queue.job.function">
<field name="model_id" ref="queue_job.model_queue_job" />
<field name="method">_test_job</field>
</record>
</odoo>
Loading
Loading