diff --git a/queue_job/README.rst b/queue_job/README.rst index 64fa696df7..80a856315e 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -378,13 +378,14 @@ Based on this configuration, we can tell that: When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately. -To do so you can set `TEST_QUEUE_JOB_NO_DELAY=1` in your enviroment. +To do so you can set `QUEUE_JOB_NO_DELAY=1` in your enviroment. **Bypass jobs in tests** When writing tests on job-related methods is always tricky to deal with -delayed recordsets. To make your testing life easier -you can set `test_queue_job_no_delay=True` in the context. +delayed recordsets. To make your testing life easier, +or to run a delayed action immediately, +you can set `queue_job__no_delay=True` in the context. Tip: you can do this at test case level like this @@ -395,7 +396,7 @@ Tip: you can do this at test case level like this super().setUpClass() cls.env = cls.env(context=dict( cls.env.context, - test_queue_job_no_delay=True, # no jobs thanks + queue_job__no_delay=True, # no jobs thanks )) Then all your tests execute the job methods synchronously diff --git a/queue_job/__init__.py b/queue_job/__init__.py index 34b2e85788..6ead7beb1f 100644 --- a/queue_job/__init__.py +++ b/queue_job/__init__.py @@ -1,5 +1,6 @@ from . import controllers from . import fields from . import models +from . import wizards from . import jobrunner from .hooks.post_init_hook import post_init_hook diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index 7b9db132cf..fbbb990860 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -1,24 +1,31 @@ # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) -{'name': 'Job Queue', - 'version': '12.0.4.1.0', - 'author': 'Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)', - 'website': 'https://github.com/OCA/queue', - 'license': 'LGPL-3', - 'category': 'Generic Modules', - 'depends': ['mail'], - 'external_dependencies': {'python': ['requests' - ], - }, - 'data': ['security/security.xml', - 'security/ir.model.access.csv', - 'views/queue_job_views.xml', - 'views/queue_job_assets.xml', - 'data/queue_data.xml', - "data/queue_job_function_data.xml"], - 'installable': True, - 'development_status': 'Mature', - 'maintainers': ['guewen'], - 'post_init_hook': 'post_init_hook' - } +{ + "name": "Job Queue", + "version": "12.0.4.1.0", + "author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)", + "website": "https://github.com/OCA/queue", + "license": "LGPL-3", + "category": "Generic Modules", + "depends": ["mail"], + "external_dependencies": {"python": ["requests"]}, + "data": [ + "security/security.xml", + "security/ir.model.access.csv", + "views/queue_job_assets.xml", + "views/queue_job_views.xml", + "views/queue_job_channel_views.xml", + "views/queue_job_function_views.xml", + "wizards/queue_jobs_to_done_views.xml", + "wizards/queue_requeue_job_views.xml", + "wizards/queue_jobs_to_cancelled_views.xml", + "views/queue_job_menus.xml", + "data/queue_data.xml", + "data/queue_job_function_data.xml", + ], + "installable": True, + "development_status": "Mature", + "maintainers": ["guewen"], + "post_init_hook": "post_init_hook", +} diff --git a/queue_job/delay.py b/queue_job/delay.py index 946f1e4cb8..eaa4bcdd01 100644 --- a/queue_job/delay.py +++ b/queue_job/delay.py @@ -4,12 +4,12 @@ import itertools import logging -import os import uuid from collections import defaultdict, deque from .job import Job +from .utils import must_run_without_delay _logger = logging.getLogger(__name__) @@ -219,18 +219,9 @@ def _has_to_execute_directly(self, vertices): In tests, prefer to use :func:`odoo.addons.queue_job.tests.common.trap_jobs`. """ - if os.getenv('TEST_QUEUE_JOB_NO_DELAY'): - _logger.warn( - '`TEST_QUEUE_JOB_NO_DELAY` env var found. NO JOB scheduled.' - ) - return True - envs = set(vertex.recordset.env for vertex in vertices) + envs = {vertex.recordset.env for vertex in vertices} for env in envs: - if env.context.get('test_queue_job_no_delay'): - _logger.warn( - '`test_queue_job_no_delay` ctx key found.' - ' NO JOB scheduled.' - ) + if must_run_without_delay(env): return True return False diff --git a/queue_job/exception.py b/queue_job/exception.py index 41f89991c4..093344ed3d 100644 --- a/queue_job/exception.py +++ b/queue_job/exception.py @@ -29,7 +29,7 @@ class RetryableJobError(JobError): """ def __init__(self, msg, seconds=None, ignore_retry=False): - super(RetryableJobError, self).__init__(msg) + super().__init__(msg) self.seconds = seconds self.ignore_retry = ignore_retry diff --git a/queue_job/fields.py b/queue_job/fields.py index 72ce5db753..7208e88ba6 100644 --- a/queue_job/fields.py +++ b/queue_job/fields.py @@ -92,10 +92,8 @@ class JobDecoder(json.JSONDecoder): """Decode json, recomposing recordsets""" def __init__(self, *args, **kwargs): - env = kwargs.pop('env') - super(JobDecoder, self).__init__( - object_hook=self.object_hook, *args, **kwargs - ) + env = kwargs.pop("env") + super().__init__(object_hook=self.object_hook, *args, **kwargs) assert env self.env = env diff --git a/queue_job/jobrunner/__init__.py b/queue_job/jobrunner/__init__.py index 240401b9e9..0955c6ef88 100644 --- a/queue_job/jobrunner/__init__.py +++ b/queue_job/jobrunner/__init__.py @@ -50,7 +50,7 @@ class WorkerJobRunner(server.Worker): """ Jobrunner workers """ def __init__(self, multi): - super(WorkerJobRunner, self).__init__(multi) + super().__init__(multi) self.watchdog_timeout = None self.runner = QueueJobRunner.from_environ_or_config() @@ -59,7 +59,7 @@ def sleep(self): def signal_handler(self, sig, frame): _logger.debug("WorkerJobRunner (%s) received signal %s", self.pid, sig) - super(WorkerJobRunner, self).signal_handler(sig, frame) + super().signal_handler(sig, frame) self.runner.stop() def process_work(self): diff --git a/queue_job/jobrunner/channels.py b/queue_job/jobrunner/channels.py index ca9b9fc494..4b9bde297d 100644 --- a/queue_job/jobrunner/channels.py +++ b/queue_job/jobrunner/channels.py @@ -117,7 +117,7 @@ class SafeSet(set): def remove(self, o): # pylint: disable=missing-return,except-pass try: - super(SafeSet, self).remove(o) + super().remove(o) except KeyError: pass diff --git a/queue_job/models/__init__.py b/queue_job/models/__init__.py index b7b1791462..4744e7ab46 100644 --- a/queue_job/models/__init__.py +++ b/queue_job/models/__init__.py @@ -1,3 +1,5 @@ from . import base from . import ir_model_fields from . import queue_job +from . import queue_job_channel +from . import queue_job_function diff --git a/queue_job/models/base.py b/queue_job/models/base.py index d89dc60bbb..8f6b3b3a08 100644 --- a/queue_job/models/base.py +++ b/queue_job/models/base.py @@ -1,6 +1,7 @@ # Copyright 2016 Camptocamp # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from ..utils import must_run_without_delay import functools import inspect import logging @@ -212,8 +213,7 @@ def auto_delay_wrapper(self, *args, **kwargs): if ( self.env.context.get("job_uuid") or not context_delay - or self.env.context.get("_job_force_sync") - or self.env.context.get("test_queue_job_no_delay") + or must_run_without_delay(self.env) ): # we are in the job execution return auto_delay_wrapper.origin(self, *args, **kwargs) diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index ba3a3acfc4..c04c180cbb 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -1,14 +1,10 @@ -# Copyright 2013-2016 Camptocamp SA +# Copyright 2013-2020 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) - -import ast -import logging import random -import re -from collections import namedtuple +import logging from datetime import datetime, timedelta -from odoo import _, api, exceptions, fields, models, tools +from odoo import _, api, exceptions, fields, models from odoo.addons.base_sparse_field.models.fields import Serialized from odoo.osv import expression from odoo.tools import html_escape @@ -30,9 +26,6 @@ _logger = logging.getLogger(__name__) -regex_job_function_name = re.compile(r"^<([0-9a-z_\.]+)>\.([0-9a-zA-Z_]+)$") - - class QueueJob(models.Model): """Model storing the jobs to be executed.""" _name = 'queue.job' @@ -505,404 +498,3 @@ def _test_job(self, failure_rate=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") - - -class RequeueJob(models.TransientModel): - _name = 'queue.requeue.job' - _description = 'Wizard to requeue a selection of jobs' - - @api.model - def _default_job_ids(self): - res = False - context = self.env.context - if (context.get('active_model') == 'queue.job' and - context.get('active_ids')): - res = context['active_ids'] - return res - - job_ids = fields.Many2many(comodel_name='queue.job', - string='Jobs', - default=_default_job_ids) - - @api.multi - def requeue(self): - jobs = self.job_ids - jobs.requeue() - return {'type': 'ir.actions.act_window_close'} - - -class SetJobsToDone(models.TransientModel): - _inherit = 'queue.requeue.job' - _name = 'queue.jobs.to.done' - _description = 'Set all selected jobs to done' - - @api.multi - def set_done(self): - jobs = self.job_ids - jobs.button_done() - return {'type': 'ir.actions.act_window_close'} - - -class SetJobsToCancelled(models.TransientModel): - _inherit = 'queue.requeue.job' - _name = 'queue.jobs.to.cancelled' - _description = 'Cancel all selected jobs' - - @api.multi - def set_cancelled(self): - jobs = self.job_ids.filtered( - lambda x: x.state in ('pending', 'failed', 'enqueued') - ) - jobs.button_cancelled() - return {'type': 'ir.actions.act_window_close'} - - -class JobChannel(models.Model): - _name = 'queue.job.channel' - _description = 'Job Channels' - - name = fields.Char() - complete_name = fields.Char(compute='_compute_complete_name', - store=True, - readonly=True) - parent_id = fields.Many2one(comodel_name='queue.job.channel', - string='Parent Channel', - ondelete='restrict') - job_function_ids = fields.One2many(comodel_name='queue.job.function', - inverse_name='channel_id', - string='Job Functions') - removal_interval = fields.Integer( - default=lambda self: self.env['queue.job']._removal_interval, - required=True) - - _sql_constraints = [ - ('name_uniq', - 'unique(complete_name)', - 'Channel complete name must be unique'), - ] - - @api.multi - @api.depends('name', 'parent_id.complete_name') - def _compute_complete_name(self): - for record in self: - if not record.name: - continue # new record - channel = record - parts = [channel.name] - while channel.parent_id: - channel = channel.parent_id - parts.append(channel.name) - record.complete_name = '.'.join(reversed(parts)) - - @api.multi - @api.constrains('parent_id', 'name') - def parent_required(self): - for record in self: - if record.name != 'root' and not record.parent_id: - raise exceptions.ValidationError(_('Parent channel required.')) - - @api.model_create_multi - def create(self, vals_list): - records = self.browse() - if self.env.context.get("install_mode"): - # installing a module that creates a channel: rebinds the channel - # to an existing one (likely we already had the channel created by - # the @job decorator previously) - new_vals_list = [] - for vals in vals_list: - name = vals.get("name") - parent_id = vals.get("parent_id") - if name and parent_id: - existing = self.search( - [("name", "=", name), ("parent_id", "=", parent_id)] - ) - if existing: - if not existing.get_metadata()[0].get("noupdate"): - existing.write(vals) - records |= existing - continue - new_vals_list.append(vals) - vals_list = new_vals_list - records |= super().create(vals_list) - return records - - @api.multi - def write(self, values): - for channel in self: - if (not self.env.context.get('install_mode') and - channel.name == 'root' and - ('name' in values or 'parent_id' in values)): - raise exceptions.Warning(_('Cannot change the root channel')) - return super(JobChannel, self).write(values) - - @api.multi - def unlink(self): - for channel in self: - if channel.name == 'root': - raise exceptions.Warning(_('Cannot remove the root channel')) - return super(JobChannel, self).unlink() - - @api.multi - def name_get(self): - result = [] - for record in self: - result.append((record.id, record.complete_name)) - return result - - -class JobFunction(models.Model): - _name = 'queue.job.function' - _description = 'Job Functions' - _log_access = False - - JobConfig = namedtuple( - "JobConfig", - "channel " - "retry_pattern " - "related_action_enable " - "related_action_func_name " - "related_action_kwargs " - "job_function_id ", - ) - - @api.model - def _default_channel(self): - return self.env.ref('queue_job.channel_root') - - name = fields.Char( - compute="_compute_name", inverse="_inverse_name", index=True, store=True, - ) - - # model and method should be required, but the required flag doesn't - # let a chance to _inverse_name to be executed - model_id = fields.Many2one( - comodel_name="ir.model", string="Model", ondelete="cascade" - ) - method = fields.Char() - - channel_id = fields.Many2one(comodel_name='queue.job.channel', - string='Channel', - required=True, - default=_default_channel) - channel = fields.Char(related='channel_id.complete_name', - store=True, - readonly=True) - retry_pattern = JobSerialized(string="Retry Pattern (serialized)", base_type=dict) - edit_retry_pattern = fields.Text( - string="Retry Pattern", - compute="_compute_edit_retry_pattern", - inverse="_inverse_edit_retry_pattern", - help="Pattern expressing from the count of retries on retryable errors," - " the number of of seconds to postpone the next execution.\n" - "Example: {1: 10, 5: 20, 10: 30, 15: 300}.\n" - "See the module description for details.", - ) - related_action = JobSerialized(string="Related Action (serialized)", base_type=dict) - edit_related_action = fields.Text( - string="Related Action", - compute="_compute_edit_related_action", - inverse="_inverse_edit_related_action", - help="The action when the button *Related Action* is used on a job. " - "The default action is to open the view of the record related " - "to the job. Configured as a dictionary with optional keys: " - "enable, func_name, kwargs.\n" - "See the module description for details.", - ) - - @api.depends("model_id.model", "method") - def _compute_name(self): - for record in self: - if not (record.model_id and record.method): - record.name = "" - continue - record.name = self.job_function_name(record.model_id.model, record.method) - - def _inverse_name(self): - groups = regex_job_function_name.match(self.name) - if not groups: - raise exceptions.UserError(_("Invalid job function: {}").format(self.name)) - model_name = groups.group(1) - method = groups.group(2) - model = self.env["ir.model"].search([("model", "=", model_name)], limit=1) - if not model: - raise exceptions.UserError(_("Model {} not found").format(model_name)) - self.model_id = model.id - self.method = method - - @api.depends("retry_pattern") - def _compute_edit_retry_pattern(self): - for record in self: - retry_pattern = record._parse_retry_pattern() - record.edit_retry_pattern = str(retry_pattern) - - def _inverse_edit_retry_pattern(self): - try: - self.retry_pattern = ast.literal_eval(self.edit_retry_pattern or "{}") - except (ValueError, TypeError): - raise exceptions.UserError(self._retry_pattern_format_error_message()) - - @api.depends("related_action") - def _compute_edit_related_action(self): - for record in self: - record.edit_related_action = str(record.related_action) - - def _inverse_edit_related_action(self): - try: - self.related_action = ast.literal_eval(self.edit_related_action or "{}") - except (ValueError, TypeError): - raise exceptions.UserError(self._related_action_format_error_message()) - - @staticmethod - def job_function_name(model_name, method_name): - return "<{}>.{}".format(model_name, method_name) - - # TODO deprecated by :job-no-decorator: - @api.model - def _find_or_create_channel(self, channel_path): - channel_model = self.env['queue.job.channel'] - parts = channel_path.split('.') - parts.reverse() - channel_name = parts.pop() - assert channel_name == 'root', "A channel path starts with 'root'" - # get the root channel - channel = channel_model.search([('name', '=', channel_name)]) - while parts: - channel_name = parts.pop() - parent_channel = channel - channel = channel_model.search([ - ('name', '=', channel_name), - ('parent_id', '=', parent_channel.id), - ], limit=1) - if not channel: - channel = channel_model.create({ - 'name': channel_name, - 'parent_id': parent_channel.id, - }) - return channel - - def job_default_config(self): - return self.JobConfig( - channel="root", - retry_pattern={}, - related_action_enable=True, - related_action_func_name=None, - related_action_kwargs={}, - job_function_id=None, - ) - - def _parse_retry_pattern(self): - try: - # as json can't have integers as keys and the field is stored - # as json, convert back to int - retry_pattern = { - int(try_count): postpone_seconds - for try_count, postpone_seconds in self.retry_pattern.items() - } - except ValueError: - _logger.error( - "Invalid retry pattern for job function %s," - " keys could not be parsed as integers, fallback" - " to the default retry pattern.", - self.name, - ) - retry_pattern = {} - return retry_pattern - - @tools.ormcache("name") - def job_config(self, name): - config = self.search([("name", "=", name)], limit=1) - if not config: - return self.job_default_config() - retry_pattern = config._parse_retry_pattern() - return self.JobConfig( - channel=config.channel, - retry_pattern=retry_pattern, - related_action_enable=config.related_action.get("enable", True), - related_action_func_name=config.related_action.get("func_name"), - related_action_kwargs=config.related_action.get("kwargs"), - job_function_id=config.id, - ) - - def _retry_pattern_format_error_message(self): - return _( - "Unexpected format of Retry Pattern for {}.\n" - "Example of valid format:\n" - "{{1: 300, 5: 600, 10: 1200, 15: 3000}}" - ).format(self.name) - - @api.constrains("retry_pattern") - def _check_retry_pattern(self): - for record in self: - retry_pattern = record.retry_pattern - if not retry_pattern: - continue - - all_values = list(retry_pattern) + list(retry_pattern.values()) - for value in all_values: - try: - int(value) - except ValueError: - raise exceptions.UserError( - record._retry_pattern_format_error_message() - ) - - def _related_action_format_error_message(self): - return _( - "Unexpected format of Related Action for {}.\n" - "Example of valid format:\n" - '{{"enable": True, "func_name": "related_action_foo",' - ' "kwargs" {{"limit": 10}}}}' - ).format(self.name) - - @api.constrains("related_action") - def _check_related_action(self): - valid_keys = ("enable", "func_name", "kwargs") - for record in self: - related_action = record.related_action - if not related_action: - continue - - if any(key not in valid_keys for key in related_action): - raise exceptions.UserError( - record._related_action_format_error_message() - ) - - @api.model_create_multi - def create(self, vals_list): - records = self.browse() - if self.env.context.get("install_mode"): - # installing a module that creates a job function: rebinds the record - # to an existing one (likely we already had the job function created by - # the @job decorator previously) - new_vals_list = [] - for vals in vals_list: - name = vals.get("name") - if name: - existing = self.search([("name", "=", name)], limit=1) - if existing: - if not existing.get_metadata()[0].get("noupdate"): - existing.write(vals) - records |= existing - continue - new_vals_list.append(vals) - vals_list = new_vals_list - records |= super().create(vals_list) - self.clear_caches() - return records - - def write(self, values): - res = super().write(values) - self.clear_caches() - return res - - def unlink(self): - res = super().unlink() - self.clear_caches() - return res - - # TODO deprecated by :job-no-decorator: - def _register_job(self, model, job_method): - func_name = self.job_function_name(model._name, job_method.__name__) - if not self.search_count([('name', '=', func_name)]): - channel = self._find_or_create_channel(job_method.default_channel) - self.create({'name': func_name, 'channel_id': channel.id}) diff --git a/queue_job/models/queue_job_channel.py b/queue_job/models/queue_job_channel.py new file mode 100644 index 0000000000..374e7417f7 --- /dev/null +++ b/queue_job/models/queue_job_channel.py @@ -0,0 +1,94 @@ +# Copyright 2013-2020 Camptocamp SA +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + + +from odoo import _, api, exceptions, fields, models + + +class QueueJobChannel(models.Model): + _name = "queue.job.channel" + _description = "Job Channels" + + name = fields.Char() + complete_name = fields.Char( + compute="_compute_complete_name", store=True, readonly=True + ) + parent_id = fields.Many2one( + comodel_name="queue.job.channel", string="Parent Channel", ondelete="restrict" + ) + job_function_ids = fields.One2many( + comodel_name="queue.job.function", + inverse_name="channel_id", + string="Job Functions", + ) + removal_interval = fields.Integer( + default=lambda self: self.env["queue.job"]._removal_interval, required=True + ) + + _sql_constraints = [ + ("name_uniq", "unique(complete_name)", "Channel complete name must be unique") + ] + + @api.depends("name", "parent_id.complete_name") + def _compute_complete_name(self): + for record in self: + if not record.name: + complete_name = "" # new record + elif record.parent_id: + complete_name = ".".join([record.parent_id.complete_name, record.name]) + else: + complete_name = record.name + record.complete_name = complete_name + + @api.constrains("parent_id", "name") + def parent_required(self): + for record in self: + if record.name != "root" and not record.parent_id: + raise exceptions.ValidationError(_("Parent channel required.")) + + @api.model_create_multi + def create(self, vals_list): + records = self.browse() + if self.env.context.get("install_mode"): + # installing a module that creates a channel: rebinds the channel + # to an existing one (likely we already had the channel created by + # the @job decorator previously) + new_vals_list = [] + for vals in vals_list: + name = vals.get("name") + parent_id = vals.get("parent_id") + if name and parent_id: + existing = self.search( + [("name", "=", name), ("parent_id", "=", parent_id)] + ) + if existing: + if not existing.get_metadata()[0].get("noupdate"): + existing.write(vals) + records |= existing + continue + new_vals_list.append(vals) + vals_list = new_vals_list + records |= super().create(vals_list) + return records + + def write(self, values): + for channel in self: + if ( + not self.env.context.get("install_mode") + and channel.name == "root" + and ("name" in values or "parent_id" in values) + ): + raise exceptions.UserError(_("Cannot change the root channel")) + return super().write(values) + + def unlink(self): + for channel in self: + if channel.name == "root": + raise exceptions.UserError(_("Cannot remove the root channel")) + return super().unlink() + + def name_get(self): + result = [] + for record in self: + result.append((record.id, record.complete_name)) + return result diff --git a/queue_job/models/queue_job_function.py b/queue_job/models/queue_job_function.py new file mode 100644 index 0000000000..47514251e1 --- /dev/null +++ b/queue_job/models/queue_job_function.py @@ -0,0 +1,274 @@ +# Copyright 2013-2020 Camptocamp SA +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import ast +import logging +import re +from collections import namedtuple + +from odoo import _, api, exceptions, fields, models, tools + +from ..fields import JobSerialized + +_logger = logging.getLogger(__name__) + + +regex_job_function_name = re.compile(r"^<([0-9a-z_\.]+)>\.([0-9a-zA-Z_]+)$") + + +class QueueJobFunction(models.Model): + _name = "queue.job.function" + _description = "Job Functions" + _log_access = False + + JobConfig = namedtuple( + "JobConfig", + "channel " + "retry_pattern " + "related_action_enable " + "related_action_func_name " + "related_action_kwargs " + "job_function_id ", + ) + + def _default_channel(self): + return self.env.ref("queue_job.channel_root") + + name = fields.Char( + compute="_compute_name", + inverse="_inverse_name", + index=True, + store=True, + ) + + # model and method should be required, but the required flag doesn't + # let a chance to _inverse_name to be executed + model_id = fields.Many2one( + comodel_name="ir.model", string="Model", ondelete="cascade" + ) + method = fields.Char() + + channel_id = fields.Many2one( + comodel_name="queue.job.channel", + string="Channel", + required=True, + default=lambda r: r._default_channel(), + ) + channel = fields.Char(related="channel_id.complete_name", store=True, readonly=True) + retry_pattern = JobSerialized(string="Retry Pattern (serialized)", base_type=dict) + edit_retry_pattern = fields.Text( + string="Retry Pattern", + compute="_compute_edit_retry_pattern", + inverse="_inverse_edit_retry_pattern", + help="Pattern expressing from the count of retries on retryable errors," + " the number of of seconds to postpone the next execution.\n" + "Example: {1: 10, 5: 20, 10: 30, 15: 300}.\n" + "See the module description for details.", + ) + related_action = JobSerialized(string="Related Action (serialized)", base_type=dict) + edit_related_action = fields.Text( + string="Related Action", + compute="_compute_edit_related_action", + inverse="_inverse_edit_related_action", + help="The action when the button *Related Action* is used on a job. " + "The default action is to open the view of the record related " + "to the job. Configured as a dictionary with optional keys: " + "enable, func_name, kwargs.\n" + "See the module description for details.", + ) + + @api.depends("model_id.model", "method") + def _compute_name(self): + for record in self: + if not (record.model_id and record.method): + record.name = "" + continue + record.name = self.job_function_name(record.model_id.model, record.method) + + def _inverse_name(self): + groups = regex_job_function_name.match(self.name) + if not groups: + raise exceptions.UserError(_("Invalid job function: {}").format(self.name)) + model_name = groups.group(1) + method = groups.group(2) + model = self.env["ir.model"].search([("model", "=", model_name)], limit=1) + if not model: + raise exceptions.UserError(_("Model {} not found").format(model_name)) + self.model_id = model.id + self.method = method + + @api.depends("retry_pattern") + def _compute_edit_retry_pattern(self): + for record in self: + retry_pattern = record._parse_retry_pattern() + record.edit_retry_pattern = str(retry_pattern) + + def _inverse_edit_retry_pattern(self): + try: + self.retry_pattern = ast.literal_eval(self.edit_retry_pattern or "{}") + except (ValueError, TypeError): + raise exceptions.UserError(self._retry_pattern_format_error_message()) + + @api.depends("related_action") + def _compute_edit_related_action(self): + for record in self: + record.edit_related_action = str(record.related_action) + + def _inverse_edit_related_action(self): + try: + self.related_action = ast.literal_eval(self.edit_related_action or "{}") + except (ValueError, TypeError): + raise exceptions.UserError(self._related_action_format_error_message()) + + @staticmethod + def job_function_name(model_name, method_name): + return "<{}>.{}".format(model_name, method_name) + + @api.model + def _find_or_create_channel(self, channel_path): + channel_model = self.env['queue.job.channel'] + parts = channel_path.split('.') + parts.reverse() + channel_name = parts.pop() + assert channel_name == 'root', "A channel path starts with 'root'" + # get the root channel + channel = channel_model.search([('name', '=', channel_name)]) + while parts: + channel_name = parts.pop() + parent_channel = channel + channel = channel_model.search([ + ('name', '=', channel_name), + ('parent_id', '=', parent_channel.id), + ], limit=1) + if not channel: + channel = channel_model.create({ + 'name': channel_name, + 'parent_id': parent_channel.id, + }) + return channel + + def job_default_config(self): + return self.JobConfig( + channel="root", + retry_pattern={}, + related_action_enable=True, + related_action_func_name=None, + related_action_kwargs={}, + job_function_id=None, + ) + + def _parse_retry_pattern(self): + try: + # as json can't have integers as keys and the field is stored + # as json, convert back to int + retry_pattern = { + int(try_count): postpone_seconds + for try_count, postpone_seconds in self.retry_pattern.items() + } + except ValueError: + _logger.error( + "Invalid retry pattern for job function %s," + " keys could not be parsed as integers, fallback" + " to the default retry pattern.", + self.name, + ) + retry_pattern = {} + return retry_pattern + + @tools.ormcache("name") + def job_config(self, name): + config = self.search([("name", "=", name)], limit=1) + if not config: + return self.job_default_config() + retry_pattern = config._parse_retry_pattern() + return self.JobConfig( + channel=config.channel, + retry_pattern=retry_pattern, + related_action_enable=config.related_action.get("enable", True), + related_action_func_name=config.related_action.get("func_name"), + related_action_kwargs=config.related_action.get("kwargs"), + job_function_id=config.id, + ) + + def _retry_pattern_format_error_message(self): + return _( + "Unexpected format of Retry Pattern for {}.\n" + "Example of valid format:\n" + "{{1: 300, 5: 600, 10: 1200, 15: 3000}}" + ).format(self.name) + + @api.constrains("retry_pattern") + def _check_retry_pattern(self): + for record in self: + retry_pattern = record.retry_pattern + if not retry_pattern: + continue + + all_values = list(retry_pattern) + list(retry_pattern.values()) + for value in all_values: + try: + int(value) + except ValueError: + raise exceptions.UserError( + record._retry_pattern_format_error_message() + ) + + def _related_action_format_error_message(self): + return _( + "Unexpected format of Related Action for {}.\n" + "Example of valid format:\n" + '{{"enable": True, "func_name": "related_action_foo",' + ' "kwargs" {{"limit": 10}}}}' + ).format(self.name) + + @api.constrains("related_action") + def _check_related_action(self): + valid_keys = ("enable", "func_name", "kwargs") + for record in self: + related_action = record.related_action + if not related_action: + continue + + if any(key not in valid_keys for key in related_action): + raise exceptions.UserError( + record._related_action_format_error_message() + ) + + @api.model_create_multi + def create(self, vals_list): + records = self.browse() + if self.env.context.get("install_mode"): + # installing a module that creates a job function: rebinds the record + # to an existing one (likely we already had the job function created by + # the @job decorator previously) + new_vals_list = [] + for vals in vals_list: + name = vals.get("name") + if name: + existing = self.search([("name", "=", name)], limit=1) + if existing: + if not existing.get_metadata()[0].get("noupdate"): + existing.write(vals) + records |= existing + continue + new_vals_list.append(vals) + vals_list = new_vals_list + records |= super().create(vals_list) + self.clear_caches() + return records + + def write(self, values): + res = super().write(values) + self.clear_caches() + return res + + def unlink(self): + res = super().unlink() + self.clear_caches() + return res + + def _register_job(self, model, job_method): + func_name = self.job_function_name(model._name, job_method.__name__) + if not self.search_count([('name', '=', func_name)]): + channel = self._find_or_create_channel(job_method.default_channel) + self.create({'name': func_name, 'channel_id': channel.id}) diff --git a/queue_job/readme/USAGE.rst b/queue_job/readme/USAGE.rst index 796764e4b0..d4a2c181e1 100644 --- a/queue_job/readme/USAGE.rst +++ b/queue_job/readme/USAGE.rst @@ -240,13 +240,13 @@ Based on this configuration, we can tell that: When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately. -To do so you can set `TEST_QUEUE_JOB_NO_DELAY=1` in your enviroment. +To do so you can set `QUEUE_JOB__NO_DELAY=1` in your enviroment. **Bypass jobs in tests** When writing tests on job-related methods is always tricky to deal with delayed recordsets. To make your testing life easier -you can set `test_queue_job_no_delay=True` in the context. +you can set `queue_job__no_delay=True` in the context. Tip: you can do this at test case level like this @@ -257,7 +257,7 @@ Tip: you can do this at test case level like this super().setUpClass() cls.env = cls.env(context=dict( cls.env.context, - test_queue_job_no_delay=True, # no jobs thanks + queue_job__no_delay=True, # no jobs thanks )) Then all your tests execute the job methods synchronously @@ -356,7 +356,7 @@ If you prefer, you can still test the whole thing in a single test, by calling When you are developing (ie: connector modules) you might want to bypass the queue job and run your code immediately. -To do so you can set ``TEST_QUEUE_JOB_NO_DELAY=1`` in your environment. +To do so you can set ``QUEUE_JOB__NO_DELAY=1`` in your environment. .. WARNING:: Do not do this in production @@ -364,7 +364,7 @@ To do so you can set ``TEST_QUEUE_JOB_NO_DELAY=1`` in your environment. You should use ``trap_jobs``, really, but if for any reason you could not use it, and still need to have job methods executed synchronously in your tests, you can -do so by setting ``test_queue_job_no_delay=True`` in the context. +do so by setting ``queue_job__no_delay=True`` in the context. Tip: you can do this at test case level like this @@ -375,7 +375,7 @@ Tip: you can do this at test case level like this super().setUpClass() cls.env = cls.env(context=dict( cls.env.context, - test_queue_job_no_delay=True, # no jobs thanks + queue_job__no_delay=True, # no jobs thanks )) Then all your tests execute the job methods synchronously without delaying any @@ -385,7 +385,7 @@ In tests you'll have to mute the logger like: @mute_logger('odoo.addons.queue_job.models.base') -.. NOTE:: in graphs of jobs, the ``test_queue_job_no_delay`` context key must be in at +.. NOTE:: in graphs of jobs, the ``queue_job__no_delay`` context key must be in at least one job's env of the graph for the whole graph to be executed synchronously diff --git a/queue_job/tests/common.py b/queue_job/tests/common.py index 92f0d1facf..fbe6589429 100644 --- a/queue_job/tests/common.py +++ b/queue_job/tests/common.py @@ -289,7 +289,7 @@ def __repr__(self): class JobCounter: def __init__(self, env): - super(JobCounter, self).__init__() + super().__init__() self.env = env self.existing = self.search_all() diff --git a/queue_job/tests/test_model_job_channel.py b/queue_job/tests/test_model_job_channel.py index 1a8e403c92..5a8c041806 100644 --- a/queue_job/tests/test_model_job_channel.py +++ b/queue_job/tests/test_model_job_channel.py @@ -11,11 +11,9 @@ class TestJobChannel(common.TransactionCase): def setUp(self): - super(TestJobChannel, self).setUp() - self.Channel = self.env['queue.job.channel'] - self.root_channel = self.Channel.search( - [('name', '=', 'root')] - ) + super().setUp() + self.Channel = self.env["queue.job.channel"] + self.root_channel = self.Channel.search([("name", "=", "root")]) def test_channel_new(self): channel = self.Channel.new() diff --git a/queue_job/utils.py b/queue_job/utils.py new file mode 100644 index 0000000000..31644fc211 --- /dev/null +++ b/queue_job/utils.py @@ -0,0 +1,45 @@ +# Copyright 2023 Camptocamp +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +import logging +import os + +from odoo.tools import config + +_logger = logging.getLogger(__name__) + + +def must_run_without_delay(env): + """Retrun true if jobs have to run immediately. + + :param env: `odoo.api.Environment` instance + """ + # TODO: drop in v17 + if os.getenv("TEST_QUEUE_JOB_NO_DELAY"): + _logger.warning( + "`TEST_QUEUE_JOB_NO_DELAY` env var found. NO JOB scheduled. " + "Note that this key is deprecated: please use `QUEUE_JOB__NO_DELAY`" + ) + return True + + if os.getenv("QUEUE_JOB__NO_DELAY"): + _logger.warning("`QUEUE_JOB__NO_DELAY` env var found. NO JOB scheduled.") + return True + + # TODO: drop in v17 + deprecated_keys = ("_job_force_sync", "test_queue_job_no_delay") + for key in deprecated_keys: + if env.context.get(key): + _logger.warning( + "`%s` ctx key found. NO JOB scheduled. " + "Note that this key is deprecated: please use `queue_job__no_delay`", + key, + ) + return True + + if env.context.get("queue_job__no_delay"): + if config.get("test_enable"): + _logger.info("`queue_job__no_delay` ctx key found. NO JOB scheduled.") + else: + _logger.warning("`queue_job__no_delay` ctx key found. NO JOB scheduled.") + return True diff --git a/queue_job/views/queue_job_channel_views.xml b/queue_job/views/queue_job_channel_views.xml new file mode 100644 index 0000000000..0841a2514e --- /dev/null +++ b/queue_job/views/queue_job_channel_views.xml @@ -0,0 +1,58 @@ + + + + + queue.job.channel.form + queue.job.channel + +
+ + + + + + + + + +
+
+
+ + + queue.job.channel.tree + queue.job.channel + + + + + + + + + queue.job.channel.search + queue.job.channel + + + + + + + + + + + Channels + queue.job.channel + tree,form + {} + + + +
diff --git a/queue_job/views/queue_job_function_views.xml b/queue_job/views/queue_job_function_views.xml new file mode 100644 index 0000000000..a6e2ce402c --- /dev/null +++ b/queue_job/views/queue_job_function_views.xml @@ -0,0 +1,58 @@ + + + + + queue.job.function.form + queue.job.function + +
+ + + + + + + + +
+
+
+ + + queue.job.function.tree + queue.job.function + + + + + + + + + + queue.job.function.search + queue.job.function + + + + + + + + + + + + + Job Functions + queue.job.function + tree,form + {} + + + +
diff --git a/queue_job/views/queue_job_menus.xml b/queue_job/views/queue_job_menus.xml new file mode 100644 index 0000000000..d288d7c0b9 --- /dev/null +++ b/queue_job/views/queue_job_menus.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + diff --git a/queue_job/views/queue_job_views.xml b/queue_job/views/queue_job_views.xml index 9a6a3b30b4..3548a18bae 100644 --- a/queue_job/views/queue_job_views.xml +++ b/queue_job/views/queue_job_views.xml @@ -136,27 +136,6 @@ - - queue.job.pivot - queue.job - - - - - - - - - - queue.job.graph - queue.job - - - - - - - queue.job.search @@ -226,207 +205,4 @@ - - - Requeue Jobs - queue.requeue.job - -
- - - - -
-
-
- - - Set Jobs to Done - queue.jobs.to.done - -
- - - - -
-
-
- - - Cancel Jobs - queue.jobs.to.cancelled - -
- - - - -
-
-
- - - Requeue Jobs - queue.requeue.job - form - form - - new - - - - - Set jobs to done - queue.jobs.to.done - form - form - - new - - - - - Cancel jobs - queue.jobs.to.cancelled - form - form - - new - - - - - queue.job.channel.form - queue.job.channel - -
- - - - - - - - - -
-
-
- - - queue.job.channel.tree - queue.job.channel - - - - - - - - - queue.job.channel.search - queue.job.channel - - - - - - - - - - - Channels - queue.job.channel - form - tree,form - {} - - - - - queue.job.function.form - queue.job.function - -
- - - - - - - - -
-
-
- - - queue.job.function.tree - queue.job.function - - - - - - - - - - queue.job.function.search - queue.job.function - - - - - - - - - - - - - Job Functions - queue.job.function - form - tree,form - {} - - - - - - - - - - - - - diff --git a/queue_job/wizards/__init__.py b/queue_job/wizards/__init__.py new file mode 100644 index 0000000000..06c0bd8572 --- /dev/null +++ b/queue_job/wizards/__init__.py @@ -0,0 +1,3 @@ +from . import queue_requeue_job +from . import queue_jobs_to_done +from . import queue_jobs_to_cancelled diff --git a/queue_job/wizards/queue_jobs_to_cancelled.py b/queue_job/wizards/queue_jobs_to_cancelled.py new file mode 100644 index 0000000000..9e73374ebd --- /dev/null +++ b/queue_job/wizards/queue_jobs_to_cancelled.py @@ -0,0 +1,17 @@ +# Copyright 2013-2020 Camptocamp SA +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +from odoo import models + + +class SetJobsToCancelled(models.TransientModel): + _inherit = "queue.requeue.job" + _name = "queue.jobs.to.cancelled" + _description = "Cancel all selected jobs" + + def set_cancelled(self): + jobs = self.job_ids.filtered( + lambda x: x.state in ("pending", "failed", "enqueued") + ) + jobs.button_cancelled() + return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_jobs_to_cancelled_views.xml b/queue_job/wizards/queue_jobs_to_cancelled_views.xml new file mode 100644 index 0000000000..da3e7a077c --- /dev/null +++ b/queue_job/wizards/queue_jobs_to_cancelled_views.xml @@ -0,0 +1,34 @@ + + + + + Cancel Jobs + queue.jobs.to.cancelled + +
+ + + +
+
+
+
+
+ + + Cancel jobs + queue.jobs.to.cancelled + form + + new + + + +
diff --git a/queue_job/wizards/queue_jobs_to_done.py b/queue_job/wizards/queue_jobs_to_done.py new file mode 100644 index 0000000000..ff1366ffed --- /dev/null +++ b/queue_job/wizards/queue_jobs_to_done.py @@ -0,0 +1,15 @@ +# Copyright 2013-2020 Camptocamp SA +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +from odoo import models + + +class SetJobsToDone(models.TransientModel): + _inherit = "queue.requeue.job" + _name = "queue.jobs.to.done" + _description = "Set all selected jobs to done" + + def set_done(self): + jobs = self.job_ids + jobs.button_done() + return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_jobs_to_done_views.xml b/queue_job/wizards/queue_jobs_to_done_views.xml new file mode 100644 index 0000000000..fde48ab3dd --- /dev/null +++ b/queue_job/wizards/queue_jobs_to_done_views.xml @@ -0,0 +1,34 @@ + + + + + Set Jobs to Done + queue.jobs.to.done + +
+ + + +
+
+
+
+
+ + + Set jobs to done + queue.jobs.to.done + form + + new + + + +
diff --git a/queue_job/wizards/queue_requeue_job.py b/queue_job/wizards/queue_requeue_job.py new file mode 100644 index 0000000000..67d2ffcbdc --- /dev/null +++ b/queue_job/wizards/queue_requeue_job.py @@ -0,0 +1,25 @@ +# Copyright 2013-2020 Camptocamp SA +# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) + +from odoo import fields, models + + +class QueueRequeueJob(models.TransientModel): + _name = "queue.requeue.job" + _description = "Wizard to requeue a selection of jobs" + + def _default_job_ids(self): + res = False + context = self.env.context + if context.get("active_model") == "queue.job" and context.get("active_ids"): + res = context["active_ids"] + return res + + job_ids = fields.Many2many( + comodel_name="queue.job", string="Jobs", default=lambda r: r._default_job_ids() + ) + + def requeue(self): + jobs = self.job_ids + jobs.requeue() + return {"type": "ir.actions.act_window_close"} diff --git a/queue_job/wizards/queue_requeue_job_views.xml b/queue_job/wizards/queue_requeue_job_views.xml new file mode 100644 index 0000000000..9a4ca935e8 --- /dev/null +++ b/queue_job/wizards/queue_requeue_job_views.xml @@ -0,0 +1,34 @@ + + + + + Requeue Jobs + queue.requeue.job + +
+ + + +
+
+
+
+
+ + + Requeue Jobs + queue.requeue.job + form + + new + + + +
diff --git a/test_queue_job/tests/test_delay_mocks.py b/test_queue_job/tests/test_delay_mocks.py index 90b55749ba..f05e94cf09 100644 --- a/test_queue_job/tests/test_delay_mocks.py +++ b/test_queue_job/tests/test_delay_mocks.py @@ -181,8 +181,8 @@ def test_mock_with_delay(self): self.assertEqual(delay_args, (1,)) self.assertDictEqual(delay_kwargs, {"foo": 2}) - @mute_logger('odoo.addons.queue_job.models.base') - @mock.patch.dict(os.environ, {"TEST_QUEUE_JOB_NO_DELAY": "1"}) + @mute_logger("odoo.addons.queue_job.utils") + @mock.patch.dict(os.environ, {"QUEUE_JOB__NO_DELAY": "1"}) def test_delay_graph_direct_exec_env_var(self): node = Delayable(self.env["test.queue.job"]).create_ir_logging( "test_delay_graph_direct_exec 1" @@ -205,15 +205,11 @@ def test_delay_graph_direct_exec_env_var(self): self.assertEqual(logs[0].message, "test_delay_graph_direct_exec 2") self.assertEqual(logs[1].message, "test_delay_graph_direct_exec 1") - @mute_logger('odoo.addons.queue_job.models.base') + @mute_logger("odoo.addons.queue_job.utils") def test_delay_graph_direct_exec_context_key(self): node = Delayable( - self.env["test.queue.job"].with_context( - test_queue_job_no_delay=True - ) - ).create_ir_logging( - "test_delay_graph_direct_exec 1" - ) + self.env["test.queue.job"].with_context(queue_job__no_delay=True) + ).create_ir_logging("test_delay_graph_direct_exec 1") node2 = Delayable(self.env["test.queue.job"]).create_ir_logging( "test_delay_graph_direct_exec 2" ) @@ -232,8 +228,8 @@ def test_delay_graph_direct_exec_context_key(self): self.assertEqual(logs[0].message, "test_delay_graph_direct_exec 2") self.assertEqual(logs[1].message, "test_delay_graph_direct_exec 1") - @mute_logger('odoo.addons.queue_job.models.base') - @mock.patch.dict(os.environ, {"TEST_QUEUE_JOB_NO_DELAY": "1"}) + @mute_logger("odoo.addons.queue_job.utils") + @mock.patch.dict(os.environ, {"QUEUE_JOB__NO_DELAY": "1"}) def test_delay_with_delay_direct_exec_env_var(self): model = self.env["test.queue.job"] model.with_delay().create_ir_logging("test_delay_graph_direct_exec 1") @@ -248,11 +244,9 @@ def test_delay_with_delay_direct_exec_env_var(self): self.assertEqual(len(logs), 1) self.assertEqual(logs[0].message, "test_delay_graph_direct_exec 1") - @mute_logger('odoo.addons.queue_job.models.base') + @mute_logger("odoo.addons.queue_job.utils") def test_delay_with_delay_direct_exec_context_key(self): - model = self.env["test.queue.job"].with_context( - test_queue_job_no_delay=True - ) + model = self.env["test.queue.job"].with_context(queue_job__no_delay=True) model.with_delay().create_ir_logging("test_delay_graph_direct_exec 1") # jobs are executed directly logs = self.env["ir.logging"].search( diff --git a/test_queue_job/tests/test_job_auto_delay.py b/test_queue_job/tests/test_job_auto_delay.py index 5549fc7487..876394a056 100644 --- a/test_queue_job/tests/test_job_auto_delay.py +++ b/test_queue_job/tests/test_job_auto_delay.py @@ -32,11 +32,17 @@ def test_auto_delay_inside_job(self): def test_auto_delay_force_sync(self): """method forced to run synchronously""" - result = ( - self.env["test.queue.job"] - .with_context(_job_force_sync=True) - .delay_me(1, kwarg=2) + with self.assertLogs(level="WARNING") as log_catcher: + result = ( + self.env["test.queue.job"] + .with_context(_job_force_sync=True) + .delay_me(1, kwarg=2) + ) + self.assertEqual( + len(log_catcher.output), 2, "Exactly two warnings (one for each job) should" + " be logged" ) + self.assertIn(" ctx key found. NO JOB scheduled. ", log_catcher.output[0]) self.assertTrue(result, (1, 2)) def test_auto_delay_context_key_set(self):