Skip to content

Commit a0bbe06

Browse files
Re-queue interrupted Celery tasks (#5800)
Co-authored-by: Robert Keyser <[email protected]>
1 parent 64c6692 commit a0bbe06

File tree

14 files changed

+396
-91
lines changed

14 files changed

+396
-91
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ Changes can also be flagged with a GitHub label for tracking purposes. The URL o
2626
- Added support for queue-specific Celery workers [#5761](https://github.com/ethyca/fides/pull/5761)
2727
- Added support for AWS SES as an email provider [#5804](https://github.com/ethyca/fides/pull/5804)
2828
- Nested identity query support for BigQuery [#5814](https://github.com/ethyca/fides/pull/5814)
29+
- Added job that automatically requeues interrupted tasks for in progress privacy requests [#5800](https://github.com/ethyca/fides/pull/5800)
2930

3031
### Changed
3132
- Improved dataset validation for namespace metadata and dataset reachability [#5744](https://github.com/ethyca/fides/pull/5744)

docker-compose.yml

+12
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,18 @@ services:
191191
target: /usr/local/etc/redis
192192
read_only: False
193193

194+
flower:
195+
image: mher/flower:2.0
196+
command: celery flower --broker=redis://:redispassword@redis:6379/0
197+
ports:
198+
- "5555:5555"
199+
depends_on:
200+
- redis
201+
- worker
202+
environment:
203+
- CELERY_BROKER_URL=redis://:redispassword@redis:6379/0
204+
- CELERY_RESULT_BACKEND=redis://:redispassword@redis:6379/0
205+
194206
volumes:
195207
postgres: null
196208

noxfiles/dev_nox.py

+14-13
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def shell(session: Session) -> None:
3636
)
3737

3838

39+
# pylint: disable=too-many-branches
3940
@nox_session()
4041
def dev(session: Session) -> None:
4142
"""
@@ -51,6 +52,7 @@ def dev(session: Session) -> None:
5152
- pc = Build and run the Privacy Center
5253
- remote_debug = Run with remote debugging enabled (see docker-compose.remote-debug.yml)
5354
- worker = Run a Fides worker
55+
- flower = Run Flower monitoring dashboard for Celery
5456
- child = Run a Fides child node
5557
- <datastore(s)> = Run a test datastore (e.g. 'mssql', 'mongodb')
5658
@@ -61,21 +63,20 @@ def dev(session: Session) -> None:
6163
build(session, "dev")
6264
session.notify("teardown")
6365

64-
if "worker" in session.posargs:
65-
session.run("docker", "compose", "up", "--wait", "worker", external=True)
66+
workers = ["worker", "worker-privacy-preferences", "worker-dsr"]
6667

67-
if "worker-privacy-preferences" in session.posargs:
68-
session.run(
69-
"docker",
70-
"compose",
71-
"up",
72-
"--wait",
73-
"worker-privacy-preferences",
74-
external=True,
75-
)
68+
for worker in workers:
69+
if worker in session.posargs:
70+
session.run("docker", "compose", "up", "--wait", worker, external=True)
7671

77-
if "worker-dsr" in session.posargs:
78-
session.run("docker", "compose", "up", "--wait", "worker-dsr", external=True)
72+
if "flower" in session.posargs:
73+
# Only start Flower if worker is also enabled
74+
if any(worker in session.posargs for worker in workers):
75+
session.run("docker", "compose", "up", "-d", "flower", external=True)
76+
else:
77+
session.error(
78+
"Flower requires the worker service. Please add 'worker' to your arguments."
79+
)
7980

8081
datastores = [
8182
datastore for datastore in session.posargs if datastore in ALL_DATASTORES

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ fastapi[all]==0.111.0
1919
fastapi-pagination[sqlalchemy]==0.12.25
2020
fideslog==1.2.10
2121
firebase-admin==5.3.0
22+
flower==2.0.1
2223
GitPython==3.1.41
2324
httpx==0.23.1
2425
hvac==0.11.2

src/fides/api/api/v1/endpoints/privacy_request_endpoints.py

+11-71
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@
8484
)
8585
from fides.api.schemas.dataset import CollectionAddressResponse, DryRunDatasetResponse
8686
from fides.api.schemas.external_https import PrivacyRequestResumeFormat
87-
from fides.api.schemas.policy import ActionType, CurrentStep
87+
from fides.api.schemas.policy import ActionType
8888
from fides.api.schemas.privacy_request import (
8989
BulkPostPrivacyRequests,
9090
BulkReviewResponse,
@@ -170,7 +170,10 @@
170170
)
171171
from fides.service.messaging.messaging_service import MessagingService
172172
from fides.service.privacy_request.privacy_request_service import (
173+
PrivacyRequestError,
173174
PrivacyRequestService,
175+
_process_privacy_request_restart,
176+
_requeue_privacy_request,
174177
_trigger_pre_approval_webhooks,
175178
queue_privacy_request,
176179
)
@@ -1867,35 +1870,6 @@ def resume_privacy_request_from_requires_input(
18671870
return privacy_request # type: ignore[return-value]
18681871

18691872

1870-
def _process_privacy_request_restart(
1871-
privacy_request: PrivacyRequest,
1872-
failed_step: Optional[CurrentStep],
1873-
db: Session,
1874-
) -> PrivacyRequestResponse:
1875-
"""If failed_step is provided, restart the DSR within that step. Otherwise,
1876-
restart the privacy request from the beginning."""
1877-
if failed_step:
1878-
logger.info(
1879-
"Restarting failed privacy request '{}' from '{}'",
1880-
privacy_request.id,
1881-
failed_step,
1882-
)
1883-
else:
1884-
logger.info(
1885-
"Restarting failed privacy request '{}' from the beginning",
1886-
privacy_request.id,
1887-
)
1888-
1889-
privacy_request.status = PrivacyRequestStatus.in_processing
1890-
privacy_request.save(db=db)
1891-
queue_privacy_request(
1892-
privacy_request_id=privacy_request.id,
1893-
from_step=failed_step.value if failed_step else None,
1894-
)
1895-
1896-
return privacy_request # type: ignore[return-value]
1897-
1898-
18991873
@router.get(
19001874
REQUEST_TASKS,
19011875
dependencies=[Security(verify_oauth_client, scopes=[PRIVACY_REQUEST_READ])],
@@ -1936,52 +1910,18 @@ def requeue_privacy_request(
19361910
19371911
Don't use this unless the Privacy Request is stuck.
19381912
"""
1939-
pr: PrivacyRequest = get_privacy_request_or_error(db, privacy_request_id)
1913+
privacy_request: PrivacyRequest = get_privacy_request_or_error(
1914+
db, privacy_request_id
1915+
)
19401916

1941-
if pr.status not in [
1942-
PrivacyRequestStatus.approved,
1943-
PrivacyRequestStatus.in_processing,
1944-
]:
1917+
try:
1918+
return _requeue_privacy_request(db, privacy_request)
1919+
except PrivacyRequestError as exc:
19451920
raise HTTPException(
19461921
status_code=HTTP_400_BAD_REQUEST,
1947-
detail=f"Request failed. Cannot re-queue privacy request {pr.id} with status {pr.status.value}",
1922+
detail=exc.message,
19481923
)
19491924

1950-
# Both DSR 2.0 and 3.0 cache checkpoint details
1951-
checkpoint_details: Optional[CheckpointActionRequired] = (
1952-
pr.get_failed_checkpoint_details()
1953-
)
1954-
resume_step = checkpoint_details.step if checkpoint_details else None
1955-
1956-
# DSR 3.0 additionally stores Request Tasks in the application db that can be used to infer
1957-
# a resume checkpoint in the event the cache has expired.
1958-
if not resume_step and pr.request_tasks.count():
1959-
if pr.consent_tasks.count():
1960-
resume_step = CurrentStep.consent
1961-
elif pr.erasure_tasks.count():
1962-
# Checking if access terminator task was completed, because erasure tasks are created
1963-
# at the same time as the access tasks
1964-
terminator_access_task = pr.get_terminate_task_by_action(ActionType.access)
1965-
resume_step = (
1966-
CurrentStep.erasure
1967-
if terminator_access_task.status == ExecutionLogStatus.complete
1968-
else CurrentStep.access
1969-
)
1970-
elif pr.access_tasks.count():
1971-
resume_step = CurrentStep.access
1972-
1973-
logger.info(
1974-
"Manually re-queuing Privacy Request {} from step {}",
1975-
pr,
1976-
resume_step.value if resume_step else None,
1977-
)
1978-
1979-
return _process_privacy_request_restart(
1980-
pr,
1981-
resume_step,
1982-
db,
1983-
)
1984-
19851925

19861926
@router.post(
19871927
REQUEST_TASK_CALLBACK,

src/fides/api/main.py

+5
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,13 @@
4040
initiate_scheduled_batch_email_send,
4141
)
4242
from fides.api.service.privacy_request.request_service import (
43+
initiate_interrupted_task_requeue_poll,
4344
initiate_poll_for_exited_privacy_request_tasks,
4445
initiate_scheduled_dsr_data_removal,
4546
)
47+
48+
# pylint: disable=wildcard-import, unused-wildcard-import
49+
from fides.api.service.saas_request.override_implementations import *
4650
from fides.api.tasks.scheduled.scheduler import async_scheduler, scheduler
4751
from fides.api.ui import (
4852
get_admin_index_as_response,
@@ -91,6 +95,7 @@ async def lifespan(wrapped_app: FastAPI) -> AsyncGenerator[None, None]:
9195
initiate_scheduled_batch_email_send()
9296
initiate_poll_for_exited_privacy_request_tasks()
9397
initiate_scheduled_dsr_data_removal()
98+
initiate_interrupted_task_requeue_poll()
9499
initiate_bcrypt_migration_task()
95100

96101
logger.debug("Sending startup analytics events...")

0 commit comments

Comments
 (0)