Skip to content

Commit

Permalink
feat: Add celery task lifecycle logging for Datadog diagnostics (#779)
Browse files Browse the repository at this point in the history
Adds celery as a testing dependency.
  • Loading branch information
timmc-edx authored Aug 22, 2024
1 parent 6be3a73 commit 560a6fb
Show file tree
Hide file tree
Showing 14 changed files with 518 additions and 130 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
~~~~~~~~~~

[4.3.0] - 2024-08-22
~~~~~~~~~~~~~~~~~~~~
Added
-----
* Added celery lifecycle logging for Datadog diagnostics, to be enabled using ``DATADOG_DIAGNOSTICS_CELERY_LOG_SIGNALS``.

[4.2.0] - 2024-08-13
~~~~~~~~~~~~~~~~~~~~
Fixed
Expand Down
2 changes: 1 addition & 1 deletion edx_arch_experiments/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
A plugin to include applications under development by the architecture team at 2U.
"""

__version__ = '4.2.0'
__version__ = '4.3.0'
116 changes: 116 additions & 0 deletions edx_arch_experiments/datadog_diagnostics/apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import logging
import re

from django.apps import AppConfig
from django.conf import settings
Expand Down Expand Up @@ -41,6 +42,112 @@ def shutdown(self, _timeout):
log.error(f"Span created but not finished: {span._pprint()}") # pylint: disable=protected-access


# Dictionary of Celery signal names to a task information extractor.
# The latter is a lambda accepting the signal receiver's kwargs dict
# and returning a minimal dict of info for tracking task lifecycle.
# Celery signal params vary quite a bit in how they convey the
# information we need, so this is probably better than trying to use
# one set of heuristics to get the task ID and name from all signals.
#
# Docs: https://docs.celeryq.dev/en/stable/userguide/signals.html
CELERY_SIGNAL_CONFIG = {
'before_task_publish': lambda kwargs: {'name': kwargs['sender']},
'after_task_publish': lambda kwargs: {'name': kwargs['sender']},
'task_prerun': lambda kwargs: {'name': kwargs['task'].name, 'id': kwargs['task_id']},
'task_postrun': lambda kwargs: {'name': kwargs['task'].name, 'id': kwargs['task_id']},
'task_retry': lambda kwargs: {'name': kwargs['sender'].name, 'id': kwargs['request'].id},
'task_success': lambda kwargs: {'name': kwargs['sender'].name},
'task_failure': lambda kwargs: {'name': kwargs['sender'].name, 'id': kwargs['task_id']},
'task_internal_error': lambda kwargs: {'name': kwargs['sender'].name, 'id': kwargs['task_id']},
'task_received': lambda kwargs: {'name': kwargs['request'].name, 'id': kwargs['request'].id},
'task_revoked': lambda kwargs: {'name': kwargs['sender'].name, 'id': kwargs['request'].id},
'task_unknown': lambda kwargs: {'name': kwargs['name'], 'id': kwargs['id']},
'task_rejected': lambda _kwargs: {},
}


def _connect_celery_handler(signal, signal_name, extractor):
"""
Register one Celery signal receiver.
This serves as a closure to capture the config (and some state) for one signal.
If the extractor ever throws, log the error just once and don't try calling it
again for the remaining life of the process (as it will likely continue failing
the same way.)
Args:
signal: Django Signal instance to register a handler for
signal_name: Name of signal in Celery signals module (used for logging)
extractor: Function to take signal receiver's entire kwargs and return
a dict optionally containing 'id' and 'name' keys.
"""
errored = False

def log_celery_signal(**kwargs):
nonlocal errored
info = None
try:
if not errored:
info = extractor(kwargs)
except BaseException:
errored = True
log.exception(
f"Error while extracting Celery signal info for '{signal_name}'; "
"will not attempt for future calls to this signal."
)

if info is None:
extra = "(skipped data extraction)"
else:
extra = f"with name={info.get('name')} id={info.get('id')}"
log.info(f"Celery signal called: '{signal_name}' {extra}")

signal.connect(log_celery_signal, weak=False)


def connect_celery_logging():
"""
Set up logging of configured Celery signals.
Throws if celery is not installed.
"""
import celery.signals # pylint: disable=import-outside-toplevel

# .. setting_name: DATADOG_DIAGNOSTICS_CELERY_LOG_SIGNALS
# .. setting_default: ''
# .. setting_description: Log calls to these Celery signals (signal name as well
# as task name and id, if available). Specify as a comma and/or whitespace delimited
# list of names from the celery.signals module. Full list of available signals:
# "after_task_publish, before_task_publish, task_failure, task_internal_error,
# task_postrun, task_prerun, task_received, task_rejected, task_retry,
# task_revoked, task_success, task_unknown"
DATADOG_DIAGNOSTICS_CELERY_LOG_SIGNALS = getattr(
settings,
'DATADOG_DIAGNOSTICS_CELERY_LOG_SIGNALS',
''
)

connected_names = []
for signal_name in re.split(r'[,\s]+', DATADOG_DIAGNOSTICS_CELERY_LOG_SIGNALS):
if not signal_name: # artifacts from splitting
continue

signal = getattr(celery.signals, signal_name, None)
if not signal:
log.warning(f"Could not connect receiver to unknown Celery signal '{signal_name}'")
continue

extractor = CELERY_SIGNAL_CONFIG.get(signal_name)
if not extractor:
log.warning(f"Don't know how to extract info for Celery signal '{signal_name}'; ignoring.")
continue

_connect_celery_handler(signal, signal_name, extractor)
connected_names.append(signal_name)

log.info(f"Logging lifecycle info for these celery signals: {connected_names!r}")


class DatadogDiagnostics(AppConfig):
"""
Django application to log diagnostic information for Datadog.
Expand Down Expand Up @@ -72,3 +179,12 @@ def ready(self):
"Unable to attach MissingSpanProcessor for Datadog diagnostics"
" -- ddtrace module not found."
)

# We think that something related to Celery instrumentation is involved
# in causing trace concatenation in Datadog. DD Support has requested that
# we log lifecycle information of Celery tasks to see if all of the needed
# signals are being emitted for their span construction.
try:
connect_celery_logging()
except BaseException:
log.exception("Unable to subscribe to Celery task signals")
125 changes: 124 additions & 1 deletion edx_arch_experiments/datadog_diagnostics/tests/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
Tests for plugin app.
"""

from unittest.mock import patch
from unittest.mock import call, patch

import celery.signals
from ddtrace import tracer
from django.dispatch import Signal
from django.test import TestCase, override_settings

from .. import apps
Expand Down Expand Up @@ -78,3 +80,124 @@ def test_logging(self, mock_log_error, mock_log_info):

mock_log_info.assert_called_once_with("Spans created = 1; spans finished = 0")
mock_log_error.assert_called_once_with("Span created but not finished: span_id=17")


class TestCeleryLogging(TestCase):
"""
Tests for celery signal logging.
While it would be nice to test actual Celery tasks and signals,
it's difficult to get that all working with unit tests. We'd have
to use Celery's pytest extra, which provides fixtures, but those
don't play well with unittest's TestCase classes and even after
converting to top-level functions things just seemed to hang after
setup.
So instead, we'll mock things out at the signal level and just
ensure that each level of functionality works in isolation.
"""

@patch('edx_arch_experiments.datadog_diagnostics.apps.log.info')
def test_default_config_has_no_signals(self, mock_log_info):
apps.DatadogDiagnostics('edx_arch_experiments.datadog_diagnostics', apps).ready()
mock_log_info.assert_called_with("Logging lifecycle info for these celery signals: []")

@patch('edx_arch_experiments.datadog_diagnostics.apps.log.info')
def test_registration_maximal(self, mock_log_info):
"""Test that all celery signal names are actually signals."""
all_signal_names = ', '.join(sorted(apps.CELERY_SIGNAL_CONFIG.keys()))
with override_settings(DATADOG_DIAGNOSTICS_CELERY_LOG_SIGNALS=all_signal_names):
apps.DatadogDiagnostics('edx_arch_experiments.datadog_diagnostics', apps).ready()

mock_log_info.assert_called_with(
"Logging lifecycle info for these celery signals: ['after_task_publish', "
"'before_task_publish', 'task_failure', 'task_internal_error', "
"'task_postrun', 'task_prerun', 'task_received', 'task_rejected', "
"'task_retry', 'task_revoked', 'task_success', 'task_unknown']"
)

@override_settings(
DATADOG_DIAGNOSTICS_CELERY_LOG_SIGNALS=',,,task_success, task_unknown, task_rejected, fake_signal'
)
@patch('edx_arch_experiments.datadog_diagnostics.apps.log.info')
@patch('edx_arch_experiments.datadog_diagnostics.apps.log.warning')
@patch('edx_arch_experiments.datadog_diagnostics.apps._connect_celery_handler')
def test_register(self, mock_connect, mock_log_warning, mock_log_info):
"""Test that signal connector is *called* as expected."""

with patch.dict('edx_arch_experiments.datadog_diagnostics.apps.CELERY_SIGNAL_CONFIG'):
# Add a bad entry to the config to test the signal lookup path
apps.CELERY_SIGNAL_CONFIG['fake_signal'] = lambda kwargs: {}
# Remove a real signal from the config to test the extractor lookup path
del apps.CELERY_SIGNAL_CONFIG['task_unknown']
apps.DatadogDiagnostics('edx_arch_experiments.datadog_diagnostics', apps).ready()

assert mock_connect.call_args_list == [
call(
celery.signals.task_success,
'task_success',
apps.CELERY_SIGNAL_CONFIG['task_success'],
),
call(
celery.signals.task_rejected,
'task_rejected',
apps.CELERY_SIGNAL_CONFIG['task_rejected'],
),
]
assert mock_log_warning.call_args_list == [
call("Don't know how to extract info for Celery signal 'task_unknown'; ignoring."),
call("Could not connect receiver to unknown Celery signal 'fake_signal'"),
]
mock_log_info.assert_called_with(
"Logging lifecycle info for these celery signals: ['task_success', 'task_rejected']"
)

@patch('edx_arch_experiments.datadog_diagnostics.apps.log.info')
@patch('edx_arch_experiments.datadog_diagnostics.apps.log.exception')
def test_handler(self, mock_log_exception, mock_log_info):
"""Test that signal connector *behaves* as expected."""
# Signal A will just do a straightforward data extraction from the kwargs.
# pylint: disable=protected-access
apps._connect_celery_handler(
signal_example_a, 'signal_example_a',
lambda kwargs: {'name': kwargs['info']['name']},
)

# Signal B will have an extractor that goes bad on the 2nd and 3rd calls
b_called_times = 0

def b_extractor(kwargs):
nonlocal b_called_times
b_called_times += 1
if b_called_times >= 2:
raise Exception("oops")

return {'id': kwargs['the_id']}

# pylint: disable=protected-access
apps._connect_celery_handler(signal_example_b, 'signal_example_b', b_extractor)

# Send to B a few times to show that error logging only happens once
signal_example_b.send(sender='some_sender', the_id=42)
signal_example_b.send(sender='some_sender', the_id=42)
signal_example_b.send(sender='some_sender', the_id=42)
# And then send to A to show it still works
signal_example_a.send(
sender='some_sender', other='whatever', info={'name': "Alice"}, name='not this one',
)

mock_log_exception.assert_called_once_with(
"Error while extracting Celery signal info for 'signal_example_b'; "
"will not attempt for future calls to this signal."
)
assert b_called_times == 2
assert mock_log_info.call_args_list == [
call("Celery signal called: 'signal_example_b' with name=None id=42"),
call("Celery signal called: 'signal_example_b' (skipped data extraction)"),
call("Celery signal called: 'signal_example_b' (skipped data extraction)"),
call("Celery signal called: 'signal_example_a' with name=Alice id=None"),
]


signal_example_a = Signal()
signal_example_b = Signal()
14 changes: 7 additions & 7 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
#
asgiref==3.8.1
# via django
attrs==23.2.0
attrs==24.2.0
# via
# jsonschema
# referencing
certifi==2024.7.4
# via requests
cffi==1.16.0
cffi==1.17.0
# via
# cryptography
# pynacl
Expand All @@ -26,7 +26,7 @@ code-annotations==1.8.0
# via edx-toggles
cryptography==43.0.0
# via pyjwt
django==4.2.14
django==4.2.15
# via
# -c https://raw.githubusercontent.com/edx/edx-lint/master/edx_lint/files/common_constraints.txt
# -r requirements/base.in
Expand Down Expand Up @@ -79,7 +79,7 @@ jsonschema-specifications==2023.12.1
# via jsonschema
markupsafe==2.1.5
# via jinja2
newrelic==9.12.0
newrelic==9.13.0
# via edx-django-utils
pbr==6.0.0
# via stevedore
Expand All @@ -97,15 +97,15 @@ pynacl==1.5.0
# via edx-django-utils
python-slugify==8.0.4
# via code-annotations
pyyaml==6.0.1
pyyaml==6.0.2
# via code-annotations
referencing==0.35.1
# via
# jsonschema
# jsonschema-specifications
requests==2.32.3
# via edx-drf-extensions
rpds-py==0.19.1
rpds-py==0.20.0
# via
# jsonschema
# referencing
Expand All @@ -128,5 +128,5 @@ urllib3==2.2.2
# via requests

# The following packages are considered to be unsafe in a requirements file:
setuptools==72.1.0
setuptools==73.0.1
# via -r requirements/base.in
4 changes: 2 additions & 2 deletions requirements/ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
#
# make upgrade
#
cachetools==5.4.0
cachetools==5.5.0
# via tox
chardet==5.2.0
# via tox
Expand All @@ -28,7 +28,7 @@ pluggy==1.5.0
# via tox
pyproject-api==1.7.1
# via tox
tox==4.16.0
tox==4.18.0
# via -r requirements/ci.in
virtualenv==20.26.3
# via tox
Loading

0 comments on commit 560a6fb

Please sign in to comment.