From eabafb60b71660f652bb5a6c477c8862ae615575 Mon Sep 17 00:00:00 2001 From: Mack Talcott Date: Tue, 26 Dec 2023 23:39:11 -0800 Subject: [PATCH] Fail main task on failure of any subtask (#32) * Fail main task on subtask failures * Save worker logs to rotated log files * Add worker log path to config --- .gitignore | 1 + app/config.py | 2 ++ app/lib/process_duplicates_task.py | 12 +++++++++++- app/server.py | 4 ++-- app/tasks.py | 25 +++++++++++++++++++++++-- log/.gitkeep | 0 6 files changed, 39 insertions(+), 5 deletions(-) create mode 100644 log/.gitkeep diff --git a/.gitignore b/.gitignore index 9b3e803..e6ce689 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ .vscode/settings.json .vscode/tasks.json .env +log/*.log* # WSL Zone.Identifier files *:Zone.Identifier .DS_Store \ No newline at end of file diff --git a/app/config.py b/app/config.py index 3b0c50f..71472d2 100644 --- a/app/config.py +++ b/app/config.py @@ -30,3 +30,5 @@ PROCESS_DUPLICATE_SUBTASK_POLL_INTERVAL = int( os.environ.get("PROCESS_DUPLICATE_SUBTASK_POLL_INTERVAL", 3) ) + +CELERY_WORKER_LOG_PATH = os.path.join("log", "celery_worker.log") diff --git a/app/lib/process_duplicates_task.py b/app/lib/process_duplicates_task.py index eadc6d5..b37d1d7 100644 --- a/app/lib/process_duplicates_task.py +++ b/app/lib/process_duplicates_task.py @@ -45,6 +45,10 @@ class DailyLimitExceededError(Exception): pass +class SubtasksFailedError(Exception): + pass + + class ProcessDuplicatesTask: SUBTASK_BATCH_SIZE = 100 @@ -235,10 +239,16 @@ def _await_subtask_completion(self): raise DailyLimitExceededError( f"Successfully completed {num_successful} of {num_completed} " f"subtasks to store images before exceeding daily baseUrl " - f"request quota. Restart task tomorrow to resume. " + f"request quota. Restart tomorrow to resume. " f"For more details on quota usage, visit " f"https://console.cloud.google.com/apis/api/photoslibrary.googleapis.com/quotas" ) + else: + raise SubtasksFailedError( + f"{num_failed} of {num_total} subtasks failed. " + f"View {app.config.CELERY_WORKER_LOG_PATH} for more details. " + f"Restart to try again." + ) if num_completed == num_total: # All done. diff --git a/app/server.py b/app/server.py index bf8036e..c9dd10c 100644 --- a/app/server.py +++ b/app/server.py @@ -8,7 +8,7 @@ from app import config from app import server # required for building URLs from app.lib.google_api_client import GoogleApiClient -from app.lib.process_duplicates_task import DailyLimitExceededError +from app.lib.process_duplicates_task import DailyLimitExceededError, SubtasksFailedError from app import FLASK_APP as flask_app from app.models.media_items_repository import MediaItemsRepository @@ -74,7 +74,7 @@ def create_task(): return flask.jsonify({"success": True}) -expected_errors = [DailyLimitExceededError] +expected_errors = [DailyLimitExceededError, SubtasksFailedError] @flask_app.route("/api/active_task", methods=["GET"]) diff --git a/app/tasks.py b/app/tasks.py index ace2bca..6b42290 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -1,8 +1,10 @@ -import logging +import logging, logging.handlers +import os import celery -from celery.signals import after_task_publish +from celery.signals import after_task_publish, after_setup_logger from typing import Callable, Optional from app import CELERY_APP as celery_app +from app.config import CELERY_WORKER_LOG_PATH from app.lib.process_duplicates_task import ProcessDuplicatesTask from app.lib.store_images_task import StoreImagesTask @@ -55,6 +57,25 @@ def emit(self, record): # torch.set_num_threads(1) +# Save worker logs to rotated log files +@after_setup_logger.connect +def on_after_setup_logger( + logger, + loglevel, + format, + **kwargs, +): + log_file_handler = logging.handlers.RotatingFileHandler( + CELERY_WORKER_LOG_PATH, + maxBytes=10_000_000, # 10 MB + backupCount=5, + ) + log_file_handler.setLevel(logging.INFO) + log_file_formatter = logging.Formatter(format) + log_file_handler.setFormatter(log_file_formatter) + logger.addHandler(log_file_handler) + + # https://stackoverflow.com/a/10089358 @after_task_publish.connect def update_sent_state(sender=None, headers=None, **kwargs): diff --git a/log/.gitkeep b/log/.gitkeep new file mode 100644 index 0000000..e69de29