From c8befe10a26eba5bf4db67dcc516eac99de2c03e Mon Sep 17 00:00:00 2001 From: Mack Talcott Date: Tue, 26 Dec 2023 21:58:27 -0800 Subject: [PATCH 1/3] Fail main task on subtask failures --- app/lib/process_duplicates_task.py | 12 +++++++++++- app/server.py | 4 ++-- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/app/lib/process_duplicates_task.py b/app/lib/process_duplicates_task.py index eadc6d5..cc98568 100644 --- a/app/lib/process_duplicates_task.py +++ b/app/lib/process_duplicates_task.py @@ -45,6 +45,10 @@ class DailyLimitExceededError(Exception): pass +class SubtasksFailedError(Exception): + pass + + class ProcessDuplicatesTask: SUBTASK_BATCH_SIZE = 100 @@ -235,10 +239,16 @@ def _await_subtask_completion(self): raise DailyLimitExceededError( f"Successfully completed {num_successful} of {num_completed} " f"subtasks to store images before exceeding daily baseUrl " - f"request quota. Restart task tomorrow to resume. " + f"request quota. Restart tomorrow to resume. " f"For more details on quota usage, visit " f"https://console.cloud.google.com/apis/api/photoslibrary.googleapis.com/quotas" ) + else: + raise SubtasksFailedError( + f"{num_failed} of {num_total} subtasks failed. " + f"Restart to try again. " + f" " + ) if num_completed == num_total: # All done. diff --git a/app/server.py b/app/server.py index bf8036e..c9dd10c 100644 --- a/app/server.py +++ b/app/server.py @@ -8,7 +8,7 @@ from app import config from app import server # required for building URLs from app.lib.google_api_client import GoogleApiClient -from app.lib.process_duplicates_task import DailyLimitExceededError +from app.lib.process_duplicates_task import DailyLimitExceededError, SubtasksFailedError from app import FLASK_APP as flask_app from app.models.media_items_repository import MediaItemsRepository @@ -74,7 +74,7 @@ def create_task(): return flask.jsonify({"success": True}) -expected_errors = [DailyLimitExceededError] +expected_errors = [DailyLimitExceededError, SubtasksFailedError] @flask_app.route("/api/active_task", methods=["GET"]) From 63401f7618f2645c4ab3d12107748e5e0c3237c3 Mon Sep 17 00:00:00 2001 From: Mack Talcott Date: Tue, 26 Dec 2023 23:12:11 -0800 Subject: [PATCH 2/3] Save worker logs to rotated log files --- .gitignore | 1 + app/tasks.py | 24 ++++++++++++++++++++++-- log/.gitkeep | 0 3 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 log/.gitkeep diff --git a/.gitignore b/.gitignore index 9b3e803..e6ce689 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ __pycache__/ .vscode/settings.json .vscode/tasks.json .env +log/*.log* # WSL Zone.Identifier files *:Zone.Identifier .DS_Store \ No newline at end of file diff --git a/app/tasks.py b/app/tasks.py index ace2bca..0d68116 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -1,6 +1,7 @@ -import logging +import logging, logging.handlers +import os import celery -from celery.signals import after_task_publish +from celery.signals import after_task_publish, after_setup_logger from typing import Callable, Optional from app import CELERY_APP as celery_app @@ -55,6 +56,25 @@ def emit(self, record): # torch.set_num_threads(1) +# Save worker logs to rotated log files +@after_setup_logger.connect +def on_after_setup_logger( + logger, + loglevel, + format, + **kwargs, +): + log_file_handler = logging.handlers.RotatingFileHandler( + os.path.join("log", "celery_worker.log"), + maxBytes=10_000_000, # 10 MB + backupCount=5, + ) + log_file_handler.setLevel(logging.INFO) + log_file_formatter = logging.Formatter(format) + log_file_handler.setFormatter(log_file_formatter) + logger.addHandler(log_file_handler) + + # https://stackoverflow.com/a/10089358 @after_task_publish.connect def update_sent_state(sender=None, headers=None, **kwargs): diff --git a/log/.gitkeep b/log/.gitkeep new file mode 100644 index 0000000..e69de29 From 3d70c61977a3d81418004e63a0b59b8623b2848d Mon Sep 17 00:00:00 2001 From: Mack Talcott Date: Tue, 26 Dec 2023 23:19:43 -0800 Subject: [PATCH 3/3] Add worker log path to config --- app/config.py | 2 ++ app/lib/process_duplicates_task.py | 4 ++-- app/tasks.py | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/app/config.py b/app/config.py index 3b0c50f..71472d2 100644 --- a/app/config.py +++ b/app/config.py @@ -30,3 +30,5 @@ PROCESS_DUPLICATE_SUBTASK_POLL_INTERVAL = int( os.environ.get("PROCESS_DUPLICATE_SUBTASK_POLL_INTERVAL", 3) ) + +CELERY_WORKER_LOG_PATH = os.path.join("log", "celery_worker.log") diff --git a/app/lib/process_duplicates_task.py b/app/lib/process_duplicates_task.py index cc98568..b37d1d7 100644 --- a/app/lib/process_duplicates_task.py +++ b/app/lib/process_duplicates_task.py @@ -246,8 +246,8 @@ def _await_subtask_completion(self): else: raise SubtasksFailedError( f"{num_failed} of {num_total} subtasks failed. " - f"Restart to try again. " - f" " + f"View {app.config.CELERY_WORKER_LOG_PATH} for more details. " + f"Restart to try again." ) if num_completed == num_total: diff --git a/app/tasks.py b/app/tasks.py index 0d68116..6b42290 100644 --- a/app/tasks.py +++ b/app/tasks.py @@ -4,6 +4,7 @@ from celery.signals import after_task_publish, after_setup_logger from typing import Callable, Optional from app import CELERY_APP as celery_app +from app.config import CELERY_WORKER_LOG_PATH from app.lib.process_duplicates_task import ProcessDuplicatesTask from app.lib.store_images_task import StoreImagesTask @@ -65,7 +66,7 @@ def on_after_setup_logger( **kwargs, ): log_file_handler = logging.handlers.RotatingFileHandler( - os.path.join("log", "celery_worker.log"), + CELERY_WORKER_LOG_PATH, maxBytes=10_000_000, # 10 MB backupCount=5, )