From 44ea839e0e1debd5f7ab8a67ed40add1ae38b174 Mon Sep 17 00:00:00 2001 From: Barry Carey Date: Mon, 23 Sep 2024 12:54:17 -0400 Subject: [PATCH] Task runtime tracking --- redditrepostsleuth/adminsvc/inbox_monitor.py | 1 - redditrepostsleuth/core/celery/basetasks.py | 2 + .../core/celery/celeryconfig.py | 2 +- .../celery/task_logic/scheduled_task_logic.py | 4 +- .../core/celery/tasks/scheduled_tasks.py | 106 ++++++++++++++---- .../core/services/eventlogging.py | 10 +- 6 files changed, 96 insertions(+), 29 deletions(-) diff --git a/redditrepostsleuth/adminsvc/inbox_monitor.py b/redditrepostsleuth/adminsvc/inbox_monitor.py index b4c505b..32f043e 100644 --- a/redditrepostsleuth/adminsvc/inbox_monitor.py +++ b/redditrepostsleuth/adminsvc/inbox_monitor.py @@ -36,7 +36,6 @@ def __init__( self.failed_checks = [] def check_inbox(self): - print('[Scheduled Job] Checking Inbox Start') for msg in self.reddit.inbox.messages(limit=500): if msg.author != 'RepostSleuthBot' and msg.subject.lower() in ['false negative', 'false positive']: #self._process_user_report(msg) diff --git a/redditrepostsleuth/core/celery/basetasks.py b/redditrepostsleuth/core/celery/basetasks.py index fed1eeb..e1e3a4a 100644 --- a/redditrepostsleuth/core/celery/basetasks.py +++ b/redditrepostsleuth/core/celery/basetasks.py @@ -1,3 +1,5 @@ +import datetime + from celery import Task from redditrepostsleuth.core.config import Config diff --git a/redditrepostsleuth/core/celery/celeryconfig.py b/redditrepostsleuth/core/celery/celeryconfig.py index c1db7d5..675e0b7 100644 --- a/redditrepostsleuth/core/celery/celeryconfig.py +++ b/redditrepostsleuth/core/celery/celeryconfig.py @@ -29,7 +29,7 @@ 'redditrepostsleuth.core.celery.admin_tasks.update_proxies_job': {'queue': 'scheduled_tasks'}, 'redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans': {'queue': 'onlyfans_check'}, 'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'}, - 'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'}, + #'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'}, 'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'}, diff --git a/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py b/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py index 2538cf4..043aafe 100644 --- a/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py +++ b/redditrepostsleuth/core/celery/task_logic/scheduled_task_logic.py @@ -239,12 +239,12 @@ def update_monitored_sub_data( monitored_sub.is_private = True if subreddit.subreddit_type == 'private' else False monitored_sub.nsfw = True if subreddit.over18 else False - log.info('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers) + log.debug('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers) perms = get_bot_permissions(subreddit) if monitored_sub.is_mod else [] monitored_sub.post_permission = True if 'all' in perms or 'posts' in perms else None monitored_sub.wiki_permission = True if 'all' in perms or 'wiki' in perms else None - log.info('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission, + log.debug('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission, monitored_sub.wiki_permission) diff --git a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py index fe2c1ba..81a0bd9 100644 --- a/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py +++ b/redditrepostsleuth/core/celery/tasks/scheduled_tasks.py @@ -1,4 +1,7 @@ -from praw.exceptions import PRAWException +import datetime +from functools import wraps +from time import perf_counter + from prawcore import TooManyRequests, Redirect, ServerError, NotFound from redditrepostsleuth.adminsvc.bot_comment_monitor import BotCommentMonitor @@ -8,24 +11,70 @@ from redditrepostsleuth.adminsvc.new_activation_monitor import NewActivationMonitor from redditrepostsleuth.core.celery import celery from redditrepostsleuth.core.celery.basetasks import RedditTask, SqlAlchemyTask, AdminTask -from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, update_top_reposts, \ - token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts -from redditrepostsleuth.core.db.databasemodels import MonitoredSub, StatsDailyCount +from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, token_checker, \ + run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts +from redditrepostsleuth.core.db.databasemodels import StatsDailyCount from redditrepostsleuth.core.logging import configure_logger from redditrepostsleuth.core.util.helpers import chunk_list -from redditrepostsleuth.core.util.reddithelpers import is_sub_mod_praw, get_bot_permissions -from redditrepostsleuth.core.util.replytemplates import MONITORED_SUB_MOD_REMOVED_CONTENT, \ - MONITORED_SUB_MOD_REMOVED_SUBJECT log = configure_logger( name='redditrepostsleuth', ) +def get_task_influx_points(task_name: str, task_status: str, task_runtime: float): + return ([ + { + 'measurement': 'Scheduled_Task_Updates', + 'time': datetime.datetime.now(datetime.UTC), + 'fields': { + 'run_time': task_runtime + }, + 'tags': { + 'task_name': task_name, + 'task_status': task_status + } + } + ]) + +def record_task_status(func): + @wraps(func) + def dec(self, *args, **kwargs): + if not hasattr(self, 'event_logger'): + log.warning('Task class %s does not have an event logger, cannot record task time', self.__name__) + return func(self, *args, **kwargs) + + try: + task_start = perf_counter() + self.event_logger.write_raw_points( + get_task_influx_points( + func.__name__, + 'started', + 0.0, + ) + ) + func(self, *args, **kwargs) + task_status = 'finished' + + except Exception as e: + log.exception('') + task_status = 'failed' + + self.event_logger.write_raw_points( + get_task_influx_points( + func.__name__, + task_status, + perf_counter() - task_start, + ) + ) + return dec + + @celery.task(bind=True, base=RedditTask) +@record_task_status def check_inbox_task(self) -> None: - print("Checking inbox") log.info('Scheduled Task: Check Inbox') + inbox_monitor = InboxMonitor(self.uowm, self.reddit, self.response_handler) try: inbox_monitor.check_inbox() @@ -35,7 +84,9 @@ def check_inbox_task(self) -> None: log.exception('Failed to update subreddit stats') + @celery.task(bind=True, base=RedditTask) +@record_task_status def check_new_activations_task(self) -> None: log.debug('Scheduled Task: Checking For Activations') activation_monitor = NewActivationMonitor( @@ -53,7 +104,9 @@ def check_new_activations_task(self) -> None: @celery.task(bind=True, base=RedditTask) +@record_task_status def check_comments_for_downvotes_task(self) -> None: + # TODO: Remove, no longer used log.info('Scheduled Task: Check Comment Downvotes') comment_monitor = BotCommentMonitor(self.reddit, self.uowm, self.config, notification_svc=self.notification_svc) try: @@ -62,6 +115,7 @@ def check_comments_for_downvotes_task(self) -> None: log.exception('Failed to update subreddit stats') @celery.task(bind=True, base=RedditTask) +@record_task_status def update_ban_list_task(self) -> None: """ Go through banned subs and see if we're still banned @@ -74,6 +128,7 @@ def update_ban_list_task(self) -> None: @celery.task(bind=True, base=RedditTask) +@record_task_status def update_monitored_sub_data_task(self) -> None: log.debug('Starting Job: Update Subreddit Data') try: @@ -86,6 +141,7 @@ def update_monitored_sub_data_task(self) -> None: @celery.task(bind=True, base=RedditTask) +@record_task_status def remove_expired_bans_task(self) -> None: log.info('Starting Job: Remove Expired Bans') try: @@ -93,18 +149,8 @@ def remove_expired_bans_task(self) -> None: except Exception as e: log.exception('Scheduled Task Failed: Update Mod Status') - -@celery.task(bind=True, base=RedditTask) -def update_top_image_reposts_task(self) -> None: - # TODO: Remove - log.info('Starting Job: Remove Expired Bans') - try: - update_stat_top_image_repost(self.uowm, self.reddit) - except Exception as e: - log.exception('Problem in scheduled task') - - @celery.task(bind=True, base=RedditTask) +@record_task_status def send_reports_to_meme_voting_task(self): log.info('Starting Job: Reports to meme voting') try: @@ -114,6 +160,7 @@ def send_reports_to_meme_voting_task(self): @celery.task(bind=True, base=RedditTask) +@record_task_status def check_meme_template_potential_votes_task(self): log.info('Starting Job: Meme Template Vote') try: @@ -121,7 +168,9 @@ def check_meme_template_potential_votes_task(self): except Exception as e: log.exception('Problem in scheduled task') + @celery.task(bind=True, base=AdminTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3}) +@record_task_status def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None: with self.uowm.start() as uow: @@ -142,7 +191,12 @@ def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None: log.exception('') @celery.task(bind=True, base=RedditTask) +@record_task_status def queue_config_updates_task(self): + """ + Goes through each registered subreddit and queues a job to check their Wiki config for updates + :param self: + """ log.info('Starting Job: Config Update Check') try: print('[Scheduled Job] Queue config update check') @@ -158,6 +212,7 @@ def queue_config_updates_task(self): @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_daily_stats(self): log.info('[Daily Stat Update] Started') daily_stats = StatsDailyCount() @@ -180,6 +235,7 @@ def update_daily_stats(self): @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_all_top_reposts_task(self): try: with self.uowm.start() as uow: @@ -188,6 +244,7 @@ def update_all_top_reposts_task(self): log.exception('Unknown task error') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_all_top_reposters_task(self): try: with self.uowm.start() as uow: @@ -196,6 +253,7 @@ def update_all_top_reposters_task(self): log.exception('Unknown task error') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_daily_top_reposters_task(self): post_types = [1, 2, 3] try: @@ -207,6 +265,7 @@ def update_daily_top_reposters_task(self): @celery.task(bind=True, base=RedditTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3}) +@record_task_status def update_monitored_sub_stats_task(self, sub_name: str) -> None: try: with self.uowm.start() as uow: @@ -225,6 +284,7 @@ def update_monitored_sub_stats_task(self, sub_name: str) -> None: log.exception('') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def update_proxies_task(self) -> None: log.info('Starting proxy update') try: @@ -233,12 +293,9 @@ def update_proxies_task(self) -> None: except Exception as e: log.exception('Failed to update proxies') -@celery.task -def update_profile_token_task(): - print('Staring token checker') - token_checker() @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def delete_search_batch(self, ids: list[int]): try: with self.uowm.start() as uow: @@ -254,6 +311,7 @@ def delete_search_batch(self, ids: list[int]): log.exception('') @celery.task(bind=True, base=SqlAlchemyTask) +@record_task_status def queue_search_history_cleanup(self): with self.uowm.start() as uow: searches = uow.repost_search.get_all_ids_older_than_days(120, limit=100000000) @@ -263,4 +321,4 @@ def queue_search_history_cleanup(self): log.info('Queuing Search History Cleanup. Range: ID Range: %s:%s', searches[0].id, searches[-1].id) ids = [x[0] for x in searches] for chunk in chunk_list(ids, 5000): - delete_search_batch.apply_async((chunk,), queue='batch_delete_searches') \ No newline at end of file + delete_search_batch.apply_async((chunk,)) \ No newline at end of file diff --git a/redditrepostsleuth/core/services/eventlogging.py b/redditrepostsleuth/core/services/eventlogging.py index 2f179d7..4b72cdb 100644 --- a/redditrepostsleuth/core/services/eventlogging.py +++ b/redditrepostsleuth/core/services/eventlogging.py @@ -82,4 +82,12 @@ def _write_to_influx(self, event: InfluxEvent) -> bool: self._unsaved_events.append(event) log.error('Failed To Write To InfluxDB', exc_info=True) log.error(event.get_influx_event()) - return False \ No newline at end of file + return False + + def write_raw_points(self, points: list[dict]): + try: + self._influx_client.write(bucket=self._config.influx_bucket, record=points) + except Exception as e: + log.exception('Failed to write to Influx') + + log.info('Wrote Influx: %s', points) \ No newline at end of file