This repository has been archived by the owner on Dec 13, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 30
/
flask_celery.py
269 lines (211 loc) · 11.3 KB
/
flask_celery.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""Celery support for Flask without breaking PyCharm inspections.
https://github.com/Robpol86/Flask-Celery-Helper
https://pypi.python.org/pypi/Flask-Celery-Helper
"""
import hashlib
from datetime import datetime, timedelta
from functools import partial, wraps
from logging import getLogger
from celery import _state, Celery as CeleryClass
__author__ = '@Robpol86'
__license__ = 'MIT'
__version__ = '1.1.0'
class OtherInstanceError(Exception):
"""Raised when Celery task is already running, when lock exists and has not timed out."""
pass
class _LockManager(object):
"""Base class for other lock managers."""
def __init__(self, celery_self, timeout, include_args, args, kwargs):
"""May raise NotImplementedError if the Celery backend is not supported.
:param celery_self: From wrapped() within single_instance(). It is the `self` object specified in a binded
Celery task definition (implicit first argument of the Celery task when @celery.task(bind=True) is used).
:param int timeout: Lock's timeout value in seconds.
:param bool include_args: If single instance should take arguments into account.
:param iter args: The task instance's args.
:param dict kwargs: The task instance's kwargs.
"""
self.celery_self = celery_self
self.timeout = timeout
self.include_args = include_args
self.args = args
self.kwargs = kwargs
self.log = getLogger('{0}:{1}'.format(self.__class__.__name__, self.task_identifier))
@property
def task_identifier(self):
"""Return the unique identifier (string) of a task instance."""
task_id = self.celery_self.name
if self.include_args:
merged_args = str(self.args) + str([(k, self.kwargs[k]) for k in sorted(self.kwargs)])
task_id += '.args.{0}'.format(hashlib.md5(merged_args.encode('utf-8')).hexdigest())
return task_id
class _LockManagerRedis(_LockManager):
"""Handle locking/unlocking for Redis backends."""
CELERY_LOCK = '_celery.single_instance.{task_id}'
def __init__(self, celery_self, timeout, include_args, args, kwargs):
super(_LockManagerRedis, self).__init__(celery_self, timeout, include_args, args, kwargs)
self.lock = None
def __enter__(self):
redis_key = self.CELERY_LOCK.format(task_id=self.task_identifier)
self.lock = self.celery_self.backend.client.lock(redis_key, timeout=self.timeout)
self.log.debug('Timeout %ds | Redis key %s', self.timeout, redis_key)
if not self.lock.acquire(blocking=False):
self.log.debug('Another instance is running.')
raise OtherInstanceError('Failed to acquire lock, {0} already running.'.format(self.task_identifier))
else:
self.log.debug('Got lock, running.')
def __exit__(self, exc_type, *_):
if exc_type == OtherInstanceError:
# Failed to get lock last time, not releasing.
return
self.log.debug('Releasing lock.')
self.lock.release()
@property
def is_already_running(self):
"""Return True if lock exists and has not timed out."""
redis_key = self.CELERY_LOCK.format(task_id=self.task_identifier)
return self.celery_self.backend.client.exists(redis_key)
def reset_lock(self):
"""Removed the lock regardless of timeout."""
redis_key = self.CELERY_LOCK.format(task_id=self.task_identifier)
self.celery_self.backend.client.delete(redis_key)
class _LockManagerDB(_LockManager):
"""Handle locking/unlocking for SQLite/MySQL/PostgreSQL/etc backends."""
def __init__(self, celery_self, timeout, include_args, args, kwargs):
super(_LockManagerDB, self).__init__(celery_self, timeout, include_args, args, kwargs)
self.save_group = getattr(self.celery_self.backend, '_save_group')
self.restore_group = getattr(self.celery_self.backend, '_restore_group')
self.delete_group = getattr(self.celery_self.backend, '_delete_group')
def __enter__(self):
self.log.debug('Timeout %ds', self.timeout)
try:
self.save_group(self.task_identifier, None)
except Exception as exc: # pylint: disable=broad-except
if 'IntegrityError' not in str(exc) and 'ProgrammingError' not in str(exc):
raise
difference = datetime.utcnow() - self.restore_group(self.task_identifier)['date_done']
if difference < timedelta(seconds=self.timeout):
self.log.debug('Another instance is running.')
raise OtherInstanceError('Failed to acquire lock, {0} already running.'.format(self.task_identifier))
self.log.debug('Timeout expired, stale lock found, releasing lock.')
self.delete_group(self.task_identifier)
self.save_group(self.task_identifier, None)
self.log.debug('Got lock, running.')
def __exit__(self, exc_type, *_):
if exc_type == OtherInstanceError:
# Failed to get lock last time, not releasing.
return
self.log.debug('Releasing lock.')
self.delete_group(self.task_identifier)
@property
def is_already_running(self):
"""Return True if lock exists and has not timed out."""
date_done = (self.restore_group(self.task_identifier) or dict()).get('date_done')
if not date_done:
return False
difference = datetime.utcnow() - date_done
return difference < timedelta(seconds=self.timeout)
def reset_lock(self):
"""Removed the lock regardless of timeout."""
self.delete_group(self.task_identifier)
def _select_manager(backend_name):
"""Select the proper LockManager based on the current backend used by Celery.
:raise NotImplementedError: If Celery is using an unsupported backend.
:param str backend_name: Class name of the current Celery backend. Usually value of
current_app.extensions['celery'].celery.backend.__class__.__name__.
:return: Class definition object (not instance). One of the _LockManager* classes.
"""
if backend_name == 'RedisBackend':
lock_manager = _LockManagerRedis
elif backend_name == 'DatabaseBackend':
lock_manager = _LockManagerDB
else:
raise NotImplementedError
return lock_manager
class _CeleryState(object):
"""Remember the configuration for the (celery, app) tuple. Modeled from SQLAlchemy."""
def __init__(self, celery, app):
self.celery = celery
self.app = app
# noinspection PyProtectedMember
class Celery(CeleryClass):
"""Celery extension for Flask applications.
Involves a hack to allow views and tests importing the celery instance from extensions.py to access the regular
Celery instance methods. This is done by subclassing celery.Celery and overwriting celery._state._register_app()
with a lambda/function that does nothing at all.
That way, on the first super() in this class' __init__(), all of the required instance objects are initialized, but
the Celery application is not registered. This class will be initialized in extensions.py but at that moment the
Flask application is not yet available.
Then, once the Flask application is available, this class' init_app() method will be called, with the Flask
application as an argument. init_app() will again call celery.Celery.__init__() but this time with the
celery._state._register_app() restored to its original functionality. in init_app() the actual Celery application is
initialized like normal.
"""
def __init__(self, app=None):
"""If app argument provided then initialize celery using application config values.
If no app argument provided you should do initialization later with init_app method.
:param app: Flask application instance.
"""
self.original_register_app = _state._register_app # Backup Celery app registration function.
_state._register_app = lambda _: None # Upon Celery app registration attempt, do nothing.
super(Celery, self).__init__()
if app is not None:
self.init_app(app)
def init_app(self, app):
"""Actual method to read celery settings from app configuration and initialize the celery instance.
:param app: Flask application instance.
"""
_state._register_app = self.original_register_app # Restore Celery app registration function.
if not hasattr(app, 'extensions'):
app.extensions = dict()
if 'celery' in app.extensions:
raise ValueError('Already registered extension CELERY.')
app.extensions['celery'] = _CeleryState(self, app)
# Instantiate celery and read config.
super(Celery, self).__init__(app.import_name, broker=app.config['CELERY_BROKER_URL'])
# Set result backend default.
if 'CELERY_RESULT_BACKEND' in app.config:
self._preconf['CELERY_RESULT_BACKEND'] = app.config['CELERY_RESULT_BACKEND']
self.conf.update(app.config)
task_base = self.Task
# Add Flask app context to celery instance.
class ContextTask(task_base):
def __call__(self, *_args, **_kwargs):
with app.app_context():
return task_base.__call__(self, *_args, **_kwargs)
setattr(ContextTask, 'abstract', True)
setattr(self, 'Task', ContextTask)
def single_instance(func=None, lock_timeout=None, include_args=False):
"""Celery task decorator. Forces the task to have only one running instance at a time.
Use with binded tasks (@celery.task(bind=True)).
Modeled after:
http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html
http://blogs.it.ox.ac.uk/inapickle/2012/01/05/python-decorators-with-optional-arguments/
Written by @Robpol86.
:raise OtherInstanceError: If another instance is already running.
:param function func: The function to decorate, must be also decorated by @celery.task.
:param int lock_timeout: Lock timeout in seconds plus five more seconds, in-case the task crashes and fails to
release the lock. If not specified, the values of the task's soft/hard limits are used. If all else fails,
timeout will be 5 minutes.
:param bool include_args: Include the md5 checksum of the arguments passed to the task in the Redis key. This allows
the same task to run with different arguments, only stopping a task from running if another instance of it is
running with the same arguments.
"""
if func is None:
return partial(single_instance, lock_timeout=lock_timeout, include_args=include_args)
@wraps(func)
def wrapped(celery_self, *args, **kwargs):
"""Wrapped Celery task, for single_instance()."""
# Select the manager and get timeout.
timeout = (
lock_timeout or celery_self.soft_time_limit or celery_self.time_limit
or celery_self.app.conf.get('CELERYD_TASK_SOFT_TIME_LIMIT')
or celery_self.app.conf.get('CELERYD_TASK_TIME_LIMIT')
or (60 * 5)
)
manager_class = _select_manager(celery_self.backend.__class__.__name__)
lock_manager = manager_class(celery_self, timeout, include_args, args, kwargs)
# Lock and execute.
with lock_manager:
ret_value = func(*args, **kwargs)
return ret_value
return wrapped