From 4048e5b189543c32cf3764081c929cf094300d7d Mon Sep 17 00:00:00 2001 From: Dominik George Date: Wed, 18 Nov 2020 22:15:18 +0100 Subject: [PATCH 1/6] Add setting CELERY_HAYSTACK_IGNORE_RESULT If True, results of the signal handler task will not be written to any result store to not produce large amounts of irrelevant results. Errors will still be stored. --- celery_haystack/conf.py | 2 ++ celery_haystack/tasks.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/celery_haystack/conf.py b/celery_haystack/conf.py index 26f278a..c88c488 100644 --- a/celery_haystack/conf.py +++ b/celery_haystack/conf.py @@ -20,6 +20,8 @@ class CeleryHaystack(AppConf): QUEUE = None #: Whether the task should be handled transaction safe TRANSACTION_SAFE = True + #: Whether the task results should be ignored + IGNORE_RESULT = False #: The batch size used by the CeleryHaystackUpdateIndex task COMMAND_BATCH_SIZE = None diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index d8acedd..e2a1ce0 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -17,6 +17,8 @@ class CeleryHaystackSignalHandler(Task): using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY + ignore_result = settings.CELERY_HAYSTACK_IGNORE_RESULT + store_errors_even_if_ignored = True def split_identifier(self, identifier, **kwargs): """ From 0024aa1b15c2ce208c4d17227296eac9fb71abc7 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 18 Nov 2020 21:18:42 +0000 Subject: [PATCH 2/6] Restyled by autopep8 --- celery_haystack/tasks.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index e2a1ce0..8269c87 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -72,7 +72,8 @@ def get_indexes(self, model_class, **kwargs): Fetch the model's registered ``SearchIndex`` in a standarized way. """ try: - using_backends = connection_router.for_write(**{'models': [model_class]}) + using_backends = connection_router.for_write( + **{'models': [model_class]}) for using in using_backends: index_holder = connections[using].get_unified_index() yield index_holder.get_index(model_class), using @@ -139,6 +140,7 @@ class CeleryHaystackUpdateIndex(Task): A celery task class to be used to call the update_index management command from Celery. """ + def run(self, apps=None, **kwargs): defaults = { 'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE, From 9ba6f71da220efdf2412334b1f37d84918a832dd Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 18 Nov 2020 21:18:45 +0000 Subject: [PATCH 3/6] Restyled by black --- celery_haystack/conf.py | 28 +++++++++------- celery_haystack/tasks.py | 70 ++++++++++++++++++++++------------------ 2 files changed, 55 insertions(+), 43 deletions(-) diff --git a/celery_haystack/conf.py b/celery_haystack/conf.py index c88c488..baf13c8 100644 --- a/celery_haystack/conf.py +++ b/celery_haystack/conf.py @@ -15,7 +15,7 @@ class CeleryHaystack(AppConf): #: The number of retries that are done MAX_RETRIES = 1 #: The default Celery task class - DEFAULT_TASK = 'celery_haystack.tasks.CeleryHaystackSignalHandler' + DEFAULT_TASK = "celery_haystack.tasks.CeleryHaystackSignalHandler" #: The name of the celery queue to use, or None for default QUEUE = None #: Whether the task should be handled transaction safe @@ -37,29 +37,35 @@ class CeleryHaystack(AppConf): COMMAND_VERBOSITY = 1 def configure_default_alias(self, value): - return value or getattr(constants, 'DEFAULT_ALIAS', None) + return value or getattr(constants, "DEFAULT_ALIAS", None) def configure_command_batch_size(self, value): - return value or getattr(cmd, 'DEFAULT_BATCH_SIZE', None) + return value or getattr(cmd, "DEFAULT_BATCH_SIZE", None) def configure_command_age(self, value): - return value or getattr(cmd, 'DEFAULT_AGE', None) + return value or getattr(cmd, "DEFAULT_AGE", None) def configure(self): data = {} for name, value in self.configured_data.items(): - if name in ('RETRY_DELAY', 'MAX_RETRIES', - 'COMMAND_WORKERS', 'COMMAND_VERBOSITY'): + if name in ( + "RETRY_DELAY", + "MAX_RETRIES", + "COMMAND_WORKERS", + "COMMAND_VERBOSITY", + ): value = int(value) data[name] = value return data -signal_processor = getattr(settings, 'HAYSTACK_SIGNAL_PROCESSOR', None) +signal_processor = getattr(settings, "HAYSTACK_SIGNAL_PROCESSOR", None) if signal_processor is None: - raise ImproperlyConfigured("When using celery-haystack with Haystack 2.X " - "the HAYSTACK_SIGNAL_PROCESSOR setting must be " - "set. Use 'celery_haystack.signals." - "CelerySignalProcessor' as default.") + raise ImproperlyConfigured( + "When using celery-haystack with Haystack 2.X " + "the HAYSTACK_SIGNAL_PROCESSOR setting must be " + "set. Use 'celery_haystack.signals." + "CelerySignalProcessor' as default." + ) diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index 8269c87..1ab4b9b 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -26,30 +26,30 @@ def split_identifier(self, identifier, **kwargs): Converts 'notes.note.23' into ('notes.note', 23). """ - bits = identifier.split('.') + bits = identifier.split(".") if len(bits) < 2: - logger.error("Unable to parse object " - "identifer '%s'. Moving on..." % identifier) + logger.error( + "Unable to parse object " "identifer '%s'. Moving on..." % identifier + ) return (None, None) pk = bits[-1] # In case Django ever handles full paths... - object_path = '.'.join(bits[:-1]) + object_path = ".".join(bits[:-1]) return (object_path, pk) def get_model_class(self, object_path, **kwargs): """ Fetch the model's class in a standarized way. """ - bits = object_path.split('.') - app_name = '.'.join(bits[:-1]) + bits = object_path.split(".") + app_name = ".".join(bits[:-1]) classname = bits[-1] model_class = apps.get_model(app_name, classname) if model_class is None: - raise ImproperlyConfigured("Could not load model '%s'." % - object_path) + raise ImproperlyConfigured("Could not load model '%s'." % object_path) return model_class def get_instance(self, model_class, pk, **kwargs): @@ -60,9 +60,14 @@ def get_instance(self, model_class, pk, **kwargs): try: instance = model_class._default_manager.get(pk=pk) except model_class.DoesNotExist: - logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" % - (model_class._meta.app_label.lower(), - model_class._meta.object_name.lower(), pk)) + logger.error( + "Couldn't load %s.%s.%s. Somehow it went missing?" + % ( + model_class._meta.app_label.lower(), + model_class._meta.object_name.lower(), + pk, + ) + ) except model_class.MultipleObjectsReturned: logger.error("More than one object with pk %s. Oops?" % pk) return instance @@ -72,14 +77,14 @@ def get_indexes(self, model_class, **kwargs): Fetch the model's registered ``SearchIndex`` in a standarized way. """ try: - using_backends = connection_router.for_write( - **{'models': [model_class]}) + using_backends = connection_router.for_write(**{"models": [model_class]}) for using in using_backends: index_holder = connections[using].get_unified_index() yield index_holder.get_index(model_class), using except IndexNotFoundException: - raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." % - model_class) + raise ImproperlyConfigured( + "Couldn't find a SearchIndex for %s." % model_class + ) def run(self, action, identifier, **kwargs): """ @@ -96,10 +101,11 @@ def run(self, action, identifier, **kwargs): # Then get the model class for the object path model_class = self.get_model_class(object_path, **kwargs) for current_index, using in self.get_indexes(model_class, **kwargs): - current_index_name = ".".join([current_index.__class__.__module__, - current_index.__class__.__name__]) + current_index_name = ".".join( + [current_index.__class__.__module__, current_index.__class__.__name__] + ) - if action == 'delete': + if action == "delete": # If the object is gone, we'll use just the identifier # against the index. try: @@ -108,15 +114,16 @@ def run(self, action, identifier, **kwargs): logger.exception(exc) self.retry(exc=exc) else: - msg = ("Deleted '%s' (with %s)" % - (identifier, current_index_name)) + msg = "Deleted '%s' (with %s)" % (identifier, current_index_name) logger.debug(msg) - elif action == 'update': + elif action == "update": # and the instance of the model class with the pk instance = self.get_instance(model_class, pk, **kwargs) if instance is None: - logger.debug("Failed updating '%s' (with %s)" % - (identifier, current_index_name)) + logger.debug( + "Failed updating '%s' (with %s)" + % (identifier, current_index_name) + ) raise ValueError("Couldn't load object '%s'" % identifier) # Call the appropriate handler of the current index and @@ -127,8 +134,7 @@ def run(self, action, identifier, **kwargs): logger.exception(exc) self.retry(exc=exc) else: - msg = ("Updated '%s' (with %s)" % - (identifier, current_index_name)) + msg = "Updated '%s' (with %s)" % (identifier, current_index_name) logger.debug(msg) else: logger.error("Unrecognized action '%s'. Moving on..." % action) @@ -143,17 +149,17 @@ class CeleryHaystackUpdateIndex(Task): def run(self, apps=None, **kwargs): defaults = { - 'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE, - 'age': settings.CELERY_HAYSTACK_COMMAND_AGE, - 'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE, - 'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS], - 'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS, - 'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY, + "batchsize": settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE, + "age": settings.CELERY_HAYSTACK_COMMAND_AGE, + "remove": settings.CELERY_HAYSTACK_COMMAND_REMOVE, + "using": [settings.CELERY_HAYSTACK_DEFAULT_ALIAS], + "workers": settings.CELERY_HAYSTACK_COMMAND_WORKERS, + "verbosity": settings.CELERY_HAYSTACK_COMMAND_VERBOSITY, } defaults.update(kwargs) if apps is None: apps = settings.CELERY_HAYSTACK_COMMAND_APPS # Run the update_index management command logger.info("Starting update index") - call_command('update_index', *apps, **defaults) + call_command("update_index", *apps, **defaults) logger.info("Finishing update index") From 065ed401d6f5faf4ccea9ff245aafe7e8390c81d Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 18 Nov 2020 21:18:46 +0000 Subject: [PATCH 4/6] Restyled by isort --- celery_haystack/conf.py | 2 +- celery_haystack/tasks.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/celery_haystack/conf.py b/celery_haystack/conf.py index baf13c8..ee6535e 100644 --- a/celery_haystack/conf.py +++ b/celery_haystack/conf.py @@ -1,8 +1,8 @@ +from appconf import AppConf from django.conf import settings # noqa from django.core.exceptions import ImproperlyConfigured from haystack import constants from haystack.management.commands import update_index as cmd -from appconf import AppConf class CeleryHaystack(AppConf): diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index 1ab4b9b..58e3794 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -1,14 +1,12 @@ +from celery.task import Task # noqa +from celery.utils.log import get_task_logger +from django.apps import apps from django.core.exceptions import ImproperlyConfigured from django.core.management import call_command -from django.apps import apps - -from .conf import settings - -from haystack import connections, connection_router +from haystack import connection_router, connections from haystack.exceptions import NotHandled as IndexNotFoundException -from celery.task import Task # noqa -from celery.utils.log import get_task_logger +from .conf import settings logger = get_task_logger(__name__) From 2660ce771ade78b55eed2e3b2e392e51ec9b5be0 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 18 Nov 2020 21:18:51 +0000 Subject: [PATCH 5/6] Restyled by reorder-python-imports --- celery_haystack/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index 58e3794..697313f 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -3,7 +3,8 @@ from django.apps import apps from django.core.exceptions import ImproperlyConfigured from django.core.management import call_command -from haystack import connection_router, connections +from haystack import connection_router +from haystack import connections from haystack.exceptions import NotHandled as IndexNotFoundException from .conf import settings From 6f2a33ab6cda0e85c58265f2f1e7e566f291de29 Mon Sep 17 00:00:00 2001 From: "Restyled.io" Date: Wed, 18 Nov 2020 21:18:53 +0000 Subject: [PATCH 6/6] Restyled by yapf --- celery_haystack/conf.py | 19 +++++++--------- celery_haystack/tasks.py | 48 +++++++++++++++++++--------------------- 2 files changed, 31 insertions(+), 36 deletions(-) diff --git a/celery_haystack/conf.py b/celery_haystack/conf.py index ee6535e..248cf6e 100644 --- a/celery_haystack/conf.py +++ b/celery_haystack/conf.py @@ -49,10 +49,10 @@ def configure(self): data = {} for name, value in self.configured_data.items(): if name in ( - "RETRY_DELAY", - "MAX_RETRIES", - "COMMAND_WORKERS", - "COMMAND_VERBOSITY", + "RETRY_DELAY", + "MAX_RETRIES", + "COMMAND_WORKERS", + "COMMAND_VERBOSITY", ): value = int(value) data[name] = value @@ -61,11 +61,8 @@ def configure(self): signal_processor = getattr(settings, "HAYSTACK_SIGNAL_PROCESSOR", None) - if signal_processor is None: - raise ImproperlyConfigured( - "When using celery-haystack with Haystack 2.X " - "the HAYSTACK_SIGNAL_PROCESSOR setting must be " - "set. Use 'celery_haystack.signals." - "CelerySignalProcessor' as default." - ) + raise ImproperlyConfigured("When using celery-haystack with Haystack 2.X " + "the HAYSTACK_SIGNAL_PROCESSOR setting must be " + "set. Use 'celery_haystack.signals." + "CelerySignalProcessor' as default.") diff --git a/celery_haystack/tasks.py b/celery_haystack/tasks.py index 697313f..c07af20 100644 --- a/celery_haystack/tasks.py +++ b/celery_haystack/tasks.py @@ -28,9 +28,8 @@ def split_identifier(self, identifier, **kwargs): bits = identifier.split(".") if len(bits) < 2: - logger.error( - "Unable to parse object " "identifer '%s'. Moving on..." % identifier - ) + logger.error("Unable to parse object " + "identifer '%s'. Moving on..." % identifier) return (None, None) pk = bits[-1] @@ -48,7 +47,8 @@ def get_model_class(self, object_path, **kwargs): model_class = apps.get_model(app_name, classname) if model_class is None: - raise ImproperlyConfigured("Could not load model '%s'." % object_path) + raise ImproperlyConfigured("Could not load model '%s'." % + object_path) return model_class def get_instance(self, model_class, pk, **kwargs): @@ -59,14 +59,11 @@ def get_instance(self, model_class, pk, **kwargs): try: instance = model_class._default_manager.get(pk=pk) except model_class.DoesNotExist: - logger.error( - "Couldn't load %s.%s.%s. Somehow it went missing?" - % ( - model_class._meta.app_label.lower(), - model_class._meta.object_name.lower(), - pk, - ) - ) + logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" % ( + model_class._meta.app_label.lower(), + model_class._meta.object_name.lower(), + pk, + )) except model_class.MultipleObjectsReturned: logger.error("More than one object with pk %s. Oops?" % pk) return instance @@ -76,14 +73,14 @@ def get_indexes(self, model_class, **kwargs): Fetch the model's registered ``SearchIndex`` in a standarized way. """ try: - using_backends = connection_router.for_write(**{"models": [model_class]}) + using_backends = connection_router.for_write( + **{"models": [model_class]}) for using in using_backends: index_holder = connections[using].get_unified_index() yield index_holder.get_index(model_class), using except IndexNotFoundException: - raise ImproperlyConfigured( - "Couldn't find a SearchIndex for %s." % model_class - ) + raise ImproperlyConfigured("Couldn't find a SearchIndex for %s." % + model_class) def run(self, action, identifier, **kwargs): """ @@ -100,9 +97,10 @@ def run(self, action, identifier, **kwargs): # Then get the model class for the object path model_class = self.get_model_class(object_path, **kwargs) for current_index, using in self.get_indexes(model_class, **kwargs): - current_index_name = ".".join( - [current_index.__class__.__module__, current_index.__class__.__name__] - ) + current_index_name = ".".join([ + current_index.__class__.__module__, + current_index.__class__.__name__ + ]) if action == "delete": # If the object is gone, we'll use just the identifier @@ -113,16 +111,15 @@ def run(self, action, identifier, **kwargs): logger.exception(exc) self.retry(exc=exc) else: - msg = "Deleted '%s' (with %s)" % (identifier, current_index_name) + msg = "Deleted '%s' (with %s)" % (identifier, + current_index_name) logger.debug(msg) elif action == "update": # and the instance of the model class with the pk instance = self.get_instance(model_class, pk, **kwargs) if instance is None: - logger.debug( - "Failed updating '%s' (with %s)" - % (identifier, current_index_name) - ) + logger.debug("Failed updating '%s' (with %s)" % + (identifier, current_index_name)) raise ValueError("Couldn't load object '%s'" % identifier) # Call the appropriate handler of the current index and @@ -133,7 +130,8 @@ def run(self, action, identifier, **kwargs): logger.exception(exc) self.retry(exc=exc) else: - msg = "Updated '%s' (with %s)" % (identifier, current_index_name) + msg = "Updated '%s' (with %s)" % (identifier, + current_index_name) logger.debug(msg) else: logger.error("Unrecognized action '%s'. Moving on..." % action)