Skip to content

Commit

Permalink
Merge pull request #381 from barrycarey/feature/task-tracking
Browse files Browse the repository at this point in the history
Task runtime tracking
  • Loading branch information
barrycarey authored Sep 23, 2024
2 parents c48b5ac + 44ea839 commit d87265c
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 29 deletions.
1 change: 0 additions & 1 deletion redditrepostsleuth/adminsvc/inbox_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def __init__(
self.failed_checks = []

def check_inbox(self):
print('[Scheduled Job] Checking Inbox Start')
for msg in self.reddit.inbox.messages(limit=500):
if msg.author != 'RepostSleuthBot' and msg.subject.lower() in ['false negative', 'false positive']:
#self._process_user_report(msg)
Expand Down
2 changes: 2 additions & 0 deletions redditrepostsleuth/core/celery/basetasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datetime

from celery import Task

from redditrepostsleuth.core.config import Config
Expand Down
2 changes: 1 addition & 1 deletion redditrepostsleuth/core/celery/celeryconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
'redditrepostsleuth.core.celery.admin_tasks.update_proxies_job': {'queue': 'scheduled_tasks'},
'redditrepostsleuth.core.celery.admin_tasks.check_user_for_only_fans': {'queue': 'onlyfans_check'},
'redditrepostsleuth.core.celery.admin_tasks.update_subreddit_config_from_database': {'queue': 'update_wiki_from_database'},
'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
#'redditrepostsleuth.core.celery.admin_tasks.delete_search_batch': {'queue': 'batch_delete_searches'},
'redditrepostsleuth.core.celery.tasks.reddit_action_tasks.*': {'queue': 'reddit_actions'},


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,12 @@ def update_monitored_sub_data(

monitored_sub.is_private = True if subreddit.subreddit_type == 'private' else False
monitored_sub.nsfw = True if subreddit.over18 else False
log.info('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers)
log.debug('[Subscriber Update] %s: %s subscribers', monitored_sub.name, monitored_sub.subscribers)

perms = get_bot_permissions(subreddit) if monitored_sub.is_mod else []
monitored_sub.post_permission = True if 'all' in perms or 'posts' in perms else None
monitored_sub.wiki_permission = True if 'all' in perms or 'wiki' in perms else None
log.info('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission,
log.debug('[Mod Check] %s | Post Perm: %s | Wiki Perm: %s', monitored_sub.name, monitored_sub.post_permission,
monitored_sub.wiki_permission)


Expand Down
106 changes: 82 additions & 24 deletions redditrepostsleuth/core/celery/tasks/scheduled_tasks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from praw.exceptions import PRAWException
import datetime
from functools import wraps
from time import perf_counter

from prawcore import TooManyRequests, Redirect, ServerError, NotFound

from redditrepostsleuth.adminsvc.bot_comment_monitor import BotCommentMonitor
Expand All @@ -8,24 +11,70 @@
from redditrepostsleuth.adminsvc.new_activation_monitor import NewActivationMonitor
from redditrepostsleuth.core.celery import celery
from redditrepostsleuth.core.celery.basetasks import RedditTask, SqlAlchemyTask, AdminTask
from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, update_top_reposts, \
token_checker, run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts
from redditrepostsleuth.core.db.databasemodels import MonitoredSub, StatsDailyCount
from redditrepostsleuth.core.celery.task_logic.scheduled_task_logic import update_proxies, token_checker, \
run_update_top_reposters, update_top_reposters, update_monitored_sub_data, run_update_top_reposts
from redditrepostsleuth.core.db.databasemodels import StatsDailyCount
from redditrepostsleuth.core.logging import configure_logger
from redditrepostsleuth.core.util.helpers import chunk_list
from redditrepostsleuth.core.util.reddithelpers import is_sub_mod_praw, get_bot_permissions
from redditrepostsleuth.core.util.replytemplates import MONITORED_SUB_MOD_REMOVED_CONTENT, \
MONITORED_SUB_MOD_REMOVED_SUBJECT

log = configure_logger(
name='redditrepostsleuth',
)

def get_task_influx_points(task_name: str, task_status: str, task_runtime: float):
return ([
{
'measurement': 'Scheduled_Task_Updates',
'time': datetime.datetime.now(datetime.UTC),
'fields': {
'run_time': task_runtime
},
'tags': {
'task_name': task_name,
'task_status': task_status
}
}
])

def record_task_status(func):
@wraps(func)
def dec(self, *args, **kwargs):
if not hasattr(self, 'event_logger'):
log.warning('Task class %s does not have an event logger, cannot record task time', self.__name__)
return func(self, *args, **kwargs)

try:
task_start = perf_counter()
self.event_logger.write_raw_points(
get_task_influx_points(
func.__name__,
'started',
0.0,
)
)
func(self, *args, **kwargs)
task_status = 'finished'

except Exception as e:
log.exception('')
task_status = 'failed'

self.event_logger.write_raw_points(
get_task_influx_points(
func.__name__,
task_status,
perf_counter() - task_start,
)
)
return dec



@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_inbox_task(self) -> None:
print("Checking inbox")
log.info('Scheduled Task: Check Inbox')

inbox_monitor = InboxMonitor(self.uowm, self.reddit, self.response_handler)
try:
inbox_monitor.check_inbox()
Expand All @@ -35,7 +84,9 @@ def check_inbox_task(self) -> None:
log.exception('Failed to update subreddit stats')



@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_new_activations_task(self) -> None:
log.debug('Scheduled Task: Checking For Activations')
activation_monitor = NewActivationMonitor(
Expand All @@ -53,7 +104,9 @@ def check_new_activations_task(self) -> None:


@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_comments_for_downvotes_task(self) -> None:
# TODO: Remove, no longer used
log.info('Scheduled Task: Check Comment Downvotes')
comment_monitor = BotCommentMonitor(self.reddit, self.uowm, self.config, notification_svc=self.notification_svc)
try:
Expand All @@ -62,6 +115,7 @@ def check_comments_for_downvotes_task(self) -> None:
log.exception('Failed to update subreddit stats')

@celery.task(bind=True, base=RedditTask)
@record_task_status
def update_ban_list_task(self) -> None:
"""
Go through banned subs and see if we're still banned
Expand All @@ -74,6 +128,7 @@ def update_ban_list_task(self) -> None:


@celery.task(bind=True, base=RedditTask)
@record_task_status
def update_monitored_sub_data_task(self) -> None:
log.debug('Starting Job: Update Subreddit Data')
try:
Expand All @@ -86,25 +141,16 @@ def update_monitored_sub_data_task(self) -> None:


@celery.task(bind=True, base=RedditTask)
@record_task_status
def remove_expired_bans_task(self) -> None:
log.info('Starting Job: Remove Expired Bans')
try:
remove_expired_bans(self.uowm, self.notification_svc)
except Exception as e:
log.exception('Scheduled Task Failed: Update Mod Status')


@celery.task(bind=True, base=RedditTask)
def update_top_image_reposts_task(self) -> None:
# TODO: Remove
log.info('Starting Job: Remove Expired Bans')
try:
update_stat_top_image_repost(self.uowm, self.reddit)
except Exception as e:
log.exception('Problem in scheduled task')


@celery.task(bind=True, base=RedditTask)
@record_task_status
def send_reports_to_meme_voting_task(self):
log.info('Starting Job: Reports to meme voting')
try:
Expand All @@ -114,14 +160,17 @@ def send_reports_to_meme_voting_task(self):


@celery.task(bind=True, base=RedditTask)
@record_task_status
def check_meme_template_potential_votes_task(self):
log.info('Starting Job: Meme Template Vote')
try:
check_meme_template_potential_votes(self.uowm)
except Exception as e:
log.exception('Problem in scheduled task')


@celery.task(bind=True, base=AdminTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3})
@record_task_status
def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None:
with self.uowm.start() as uow:

Expand All @@ -142,7 +191,12 @@ def check_for_subreddit_config_update_task(self, subreddit_name: str) -> None:
log.exception('')

@celery.task(bind=True, base=RedditTask)
@record_task_status
def queue_config_updates_task(self):
"""
Goes through each registered subreddit and queues a job to check their Wiki config for updates
:param self:
"""
log.info('Starting Job: Config Update Check')
try:
print('[Scheduled Job] Queue config update check')
Expand All @@ -158,6 +212,7 @@ def queue_config_updates_task(self):


@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_daily_stats(self):
log.info('[Daily Stat Update] Started')
daily_stats = StatsDailyCount()
Expand All @@ -180,6 +235,7 @@ def update_daily_stats(self):


@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_all_top_reposts_task(self):
try:
with self.uowm.start() as uow:
Expand All @@ -188,6 +244,7 @@ def update_all_top_reposts_task(self):
log.exception('Unknown task error')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_all_top_reposters_task(self):
try:
with self.uowm.start() as uow:
Expand All @@ -196,6 +253,7 @@ def update_all_top_reposters_task(self):
log.exception('Unknown task error')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_daily_top_reposters_task(self):
post_types = [1, 2, 3]
try:
Expand All @@ -207,6 +265,7 @@ def update_daily_top_reposters_task(self):


@celery.task(bind=True, base=RedditTask, autoretry_for=(TooManyRequests,), retry_kwards={'max_retries': 3})
@record_task_status
def update_monitored_sub_stats_task(self, sub_name: str) -> None:
try:
with self.uowm.start() as uow:
Expand All @@ -225,6 +284,7 @@ def update_monitored_sub_stats_task(self, sub_name: str) -> None:
log.exception('')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def update_proxies_task(self) -> None:
log.info('Starting proxy update')
try:
Expand All @@ -233,12 +293,9 @@ def update_proxies_task(self) -> None:
except Exception as e:
log.exception('Failed to update proxies')

@celery.task
def update_profile_token_task():
print('Staring token checker')
token_checker()

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def delete_search_batch(self, ids: list[int]):
try:
with self.uowm.start() as uow:
Expand All @@ -254,6 +311,7 @@ def delete_search_batch(self, ids: list[int]):
log.exception('')

@celery.task(bind=True, base=SqlAlchemyTask)
@record_task_status
def queue_search_history_cleanup(self):
with self.uowm.start() as uow:
searches = uow.repost_search.get_all_ids_older_than_days(120, limit=100000000)
Expand All @@ -263,4 +321,4 @@ def queue_search_history_cleanup(self):
log.info('Queuing Search History Cleanup. Range: ID Range: %s:%s', searches[0].id, searches[-1].id)
ids = [x[0] for x in searches]
for chunk in chunk_list(ids, 5000):
delete_search_batch.apply_async((chunk,), queue='batch_delete_searches')
delete_search_batch.apply_async((chunk,))
10 changes: 9 additions & 1 deletion redditrepostsleuth/core/services/eventlogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,12 @@ def _write_to_influx(self, event: InfluxEvent) -> bool:
self._unsaved_events.append(event)
log.error('Failed To Write To InfluxDB', exc_info=True)
log.error(event.get_influx_event())
return False
return False

def write_raw_points(self, points: list[dict]):
try:
self._influx_client.write(bucket=self._config.influx_bucket, record=points)
except Exception as e:
log.exception('Failed to write to Influx')

log.info('Wrote Influx: %s', points)

0 comments on commit d87265c

Please sign in to comment.