Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restyle Add setting CELERY_HAYSTACK_IGNORE_RESULT #76

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions celery_haystack/conf.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from appconf import AppConf
from django.conf import settings # noqa
from django.core.exceptions import ImproperlyConfigured
from haystack import constants
from haystack.management.commands import update_index as cmd
from appconf import AppConf


class CeleryHaystack(AppConf):
Expand All @@ -15,11 +15,13 @@ class CeleryHaystack(AppConf):
#: The number of retries that are done
MAX_RETRIES = 1
#: The default Celery task class
DEFAULT_TASK = 'celery_haystack.tasks.CeleryHaystackSignalHandler'
DEFAULT_TASK = "celery_haystack.tasks.CeleryHaystackSignalHandler"
#: The name of the celery queue to use, or None for default
QUEUE = None
#: Whether the task should be handled transaction safe
TRANSACTION_SAFE = True
#: Whether the task results should be ignored
IGNORE_RESULT = False

#: The batch size used by the CeleryHaystackUpdateIndex task
COMMAND_BATCH_SIZE = None
Expand All @@ -35,26 +37,29 @@ class CeleryHaystack(AppConf):
COMMAND_VERBOSITY = 1

def configure_default_alias(self, value):
return value or getattr(constants, 'DEFAULT_ALIAS', None)
return value or getattr(constants, "DEFAULT_ALIAS", None)

def configure_command_batch_size(self, value):
return value or getattr(cmd, 'DEFAULT_BATCH_SIZE', None)
return value or getattr(cmd, "DEFAULT_BATCH_SIZE", None)

def configure_command_age(self, value):
return value or getattr(cmd, 'DEFAULT_AGE', None)
return value or getattr(cmd, "DEFAULT_AGE", None)

def configure(self):
data = {}
for name, value in self.configured_data.items():
if name in ('RETRY_DELAY', 'MAX_RETRIES',
'COMMAND_WORKERS', 'COMMAND_VERBOSITY'):
if name in (
"RETRY_DELAY",
"MAX_RETRIES",
"COMMAND_WORKERS",
"COMMAND_VERBOSITY",
):
value = int(value)
data[name] = value
return data


signal_processor = getattr(settings, 'HAYSTACK_SIGNAL_PROCESSOR', None)

signal_processor = getattr(settings, "HAYSTACK_SIGNAL_PROCESSOR", None)

if signal_processor is None:
raise ImproperlyConfigured("When using celery-haystack with Haystack 2.X "
Expand Down
67 changes: 37 additions & 30 deletions celery_haystack/tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
from celery.task import Task # noqa
from celery.utils.log import get_task_logger
from django.apps import apps
from django.core.exceptions import ImproperlyConfigured
from django.core.management import call_command
from django.apps import apps

from .conf import settings

from haystack import connections, connection_router
from haystack import connection_router
from haystack import connections
from haystack.exceptions import NotHandled as IndexNotFoundException

from celery.task import Task # noqa
from celery.utils.log import get_task_logger
from .conf import settings

logger = get_task_logger(__name__)

Expand All @@ -17,14 +16,16 @@ class CeleryHaystackSignalHandler(Task):
using = settings.CELERY_HAYSTACK_DEFAULT_ALIAS
max_retries = settings.CELERY_HAYSTACK_MAX_RETRIES
default_retry_delay = settings.CELERY_HAYSTACK_RETRY_DELAY
ignore_result = settings.CELERY_HAYSTACK_IGNORE_RESULT
store_errors_even_if_ignored = True

def split_identifier(self, identifier, **kwargs):
"""
Break down the identifier representing the instance.

Converts 'notes.note.23' into ('notes.note', 23).
"""
bits = identifier.split('.')
bits = identifier.split(".")

if len(bits) < 2:
logger.error("Unable to parse object "
Expand All @@ -33,15 +34,15 @@ def split_identifier(self, identifier, **kwargs):

pk = bits[-1]
# In case Django ever handles full paths...
object_path = '.'.join(bits[:-1])
object_path = ".".join(bits[:-1])
return (object_path, pk)

def get_model_class(self, object_path, **kwargs):
"""
Fetch the model's class in a standarized way.
"""
bits = object_path.split('.')
app_name = '.'.join(bits[:-1])
bits = object_path.split(".")
app_name = ".".join(bits[:-1])
classname = bits[-1]
model_class = apps.get_model(app_name, classname)

Expand All @@ -58,9 +59,11 @@ def get_instance(self, model_class, pk, **kwargs):
try:
instance = model_class._default_manager.get(pk=pk)
except model_class.DoesNotExist:
logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" %
(model_class._meta.app_label.lower(),
model_class._meta.object_name.lower(), pk))
logger.error("Couldn't load %s.%s.%s. Somehow it went missing?" % (
model_class._meta.app_label.lower(),
model_class._meta.object_name.lower(),
pk,
))
except model_class.MultipleObjectsReturned:
logger.error("More than one object with pk %s. Oops?" % pk)
return instance
Expand All @@ -70,7 +73,8 @@ def get_indexes(self, model_class, **kwargs):
Fetch the model's registered ``SearchIndex`` in a standarized way.
"""
try:
using_backends = connection_router.for_write(**{'models': [model_class]})
using_backends = connection_router.for_write(
**{"models": [model_class]})
for using in using_backends:
index_holder = connections[using].get_unified_index()
yield index_holder.get_index(model_class), using
Expand All @@ -93,10 +97,12 @@ def run(self, action, identifier, **kwargs):
# Then get the model class for the object path
model_class = self.get_model_class(object_path, **kwargs)
for current_index, using in self.get_indexes(model_class, **kwargs):
current_index_name = ".".join([current_index.__class__.__module__,
current_index.__class__.__name__])
current_index_name = ".".join([
current_index.__class__.__module__,
current_index.__class__.__name__
])

if action == 'delete':
if action == "delete":
# If the object is gone, we'll use just the identifier
# against the index.
try:
Expand All @@ -105,10 +111,10 @@ def run(self, action, identifier, **kwargs):
logger.exception(exc)
self.retry(exc=exc)
else:
msg = ("Deleted '%s' (with %s)" %
(identifier, current_index_name))
msg = "Deleted '%s' (with %s)" % (identifier,
current_index_name)
logger.debug(msg)
elif action == 'update':
elif action == "update":
# and the instance of the model class with the pk
instance = self.get_instance(model_class, pk, **kwargs)
if instance is None:
Expand All @@ -124,8 +130,8 @@ def run(self, action, identifier, **kwargs):
logger.exception(exc)
self.retry(exc=exc)
else:
msg = ("Updated '%s' (with %s)" %
(identifier, current_index_name))
msg = "Updated '%s' (with %s)" % (identifier,
current_index_name)
logger.debug(msg)
else:
logger.error("Unrecognized action '%s'. Moving on..." % action)
Expand All @@ -137,19 +143,20 @@ class CeleryHaystackUpdateIndex(Task):
A celery task class to be used to call the update_index management
command from Celery.
"""

def run(self, apps=None, **kwargs):
defaults = {
'batchsize': settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
'age': settings.CELERY_HAYSTACK_COMMAND_AGE,
'remove': settings.CELERY_HAYSTACK_COMMAND_REMOVE,
'using': [settings.CELERY_HAYSTACK_DEFAULT_ALIAS],
'workers': settings.CELERY_HAYSTACK_COMMAND_WORKERS,
'verbosity': settings.CELERY_HAYSTACK_COMMAND_VERBOSITY,
"batchsize": settings.CELERY_HAYSTACK_COMMAND_BATCH_SIZE,
"age": settings.CELERY_HAYSTACK_COMMAND_AGE,
"remove": settings.CELERY_HAYSTACK_COMMAND_REMOVE,
"using": [settings.CELERY_HAYSTACK_DEFAULT_ALIAS],
"workers": settings.CELERY_HAYSTACK_COMMAND_WORKERS,
"verbosity": settings.CELERY_HAYSTACK_COMMAND_VERBOSITY,
}
defaults.update(kwargs)
if apps is None:
apps = settings.CELERY_HAYSTACK_COMMAND_APPS
# Run the update_index management command
logger.info("Starting update index")
call_command('update_index', *apps, **defaults)
call_command("update_index", *apps, **defaults)
logger.info("Finishing update index")