Skip to content

Commit

Permalink
Fail main task on failure of any subtask (#32)
Browse files Browse the repository at this point in the history
* Fail main task on subtask failures
* Save worker logs to rotated log files
* Add worker log path to config
  • Loading branch information
mtalcott authored Dec 27, 2023
1 parent 3f4515c commit eabafb6
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 5 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ __pycache__/
.vscode/settings.json
.vscode/tasks.json
.env
log/*.log*
# WSL Zone.Identifier files
*:Zone.Identifier
.DS_Store
2 changes: 2 additions & 0 deletions app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,5 @@
PROCESS_DUPLICATE_SUBTASK_POLL_INTERVAL = int(
os.environ.get("PROCESS_DUPLICATE_SUBTASK_POLL_INTERVAL", 3)
)

CELERY_WORKER_LOG_PATH = os.path.join("log", "celery_worker.log")
12 changes: 11 additions & 1 deletion app/lib/process_duplicates_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ class DailyLimitExceededError(Exception):
pass


class SubtasksFailedError(Exception):
pass


class ProcessDuplicatesTask:
SUBTASK_BATCH_SIZE = 100

Expand Down Expand Up @@ -235,10 +239,16 @@ def _await_subtask_completion(self):
raise DailyLimitExceededError(
f"Successfully completed {num_successful} of {num_completed} "
f"subtasks to store images before exceeding daily baseUrl "
f"request quota. Restart task tomorrow to resume. "
f"request quota. Restart tomorrow to resume. "
f"For more details on quota usage, visit "
f"https://console.cloud.google.com/apis/api/photoslibrary.googleapis.com/quotas"
)
else:
raise SubtasksFailedError(
f"{num_failed} of {num_total} subtasks failed. "
f"View {app.config.CELERY_WORKER_LOG_PATH} for more details. "
f"Restart to try again."
)

if num_completed == num_total:
# All done.
Expand Down
4 changes: 2 additions & 2 deletions app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from app import config
from app import server # required for building URLs
from app.lib.google_api_client import GoogleApiClient
from app.lib.process_duplicates_task import DailyLimitExceededError
from app.lib.process_duplicates_task import DailyLimitExceededError, SubtasksFailedError
from app import FLASK_APP as flask_app
from app.models.media_items_repository import MediaItemsRepository

Expand Down Expand Up @@ -74,7 +74,7 @@ def create_task():
return flask.jsonify({"success": True})


expected_errors = [DailyLimitExceededError]
expected_errors = [DailyLimitExceededError, SubtasksFailedError]


@flask_app.route("/api/active_task", methods=["GET"])
Expand Down
25 changes: 23 additions & 2 deletions app/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import logging
import logging, logging.handlers
import os
import celery
from celery.signals import after_task_publish
from celery.signals import after_task_publish, after_setup_logger
from typing import Callable, Optional
from app import CELERY_APP as celery_app
from app.config import CELERY_WORKER_LOG_PATH

from app.lib.process_duplicates_task import ProcessDuplicatesTask
from app.lib.store_images_task import StoreImagesTask
Expand Down Expand Up @@ -55,6 +57,25 @@ def emit(self, record):
# torch.set_num_threads(1)


# Save worker logs to rotated log files
@after_setup_logger.connect
def on_after_setup_logger(
logger,
loglevel,
format,
**kwargs,
):
log_file_handler = logging.handlers.RotatingFileHandler(
CELERY_WORKER_LOG_PATH,
maxBytes=10_000_000, # 10 MB
backupCount=5,
)
log_file_handler.setLevel(logging.INFO)
log_file_formatter = logging.Formatter(format)
log_file_handler.setFormatter(log_file_formatter)
logger.addHandler(log_file_handler)


# https://stackoverflow.com/a/10089358
@after_task_publish.connect
def update_sent_state(sender=None, headers=None, **kwargs):
Expand Down
Empty file added log/.gitkeep
Empty file.

0 comments on commit eabafb6

Please sign in to comment.